提交 2fa3c656 编写于 作者: S Shengliang Guan

merge from develop

...@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex); ...@@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex); SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList); void tscColumnListDestroy(SArray* pColList);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int32_t tscValidateName(SSQLToken* pToken); int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream); void tscIncStreamExecutionCount(void* pStream);
......
...@@ -32,8 +32,8 @@ extern "C" { ...@@ -32,8 +32,8 @@ extern "C" {
#include "qExecutor.h" #include "qExecutor.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h" #include "qtsbuf.h"
#include "tcmdtype.h"
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
...@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, ...@@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void *param, void **taos); void *param, void **taos);
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
...@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql); ...@@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
// todo remove this function.
bool tscResultsetFetchCompleted(TAOS_RES *result); bool tscResultsetFetchCompleted(TAOS_RES *result);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd); char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
......
...@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// handle the sub queries of join query // handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
} else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) { } else if (pRes->completed) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. if(pCmd->command == TSDB_SQL_FETCH) {
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
return; tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else { return;
/* } else {
/*
* all available virtual node has been checked already, now we need to check * all available virtual node has been checked already, now we need to check
* for the next subclause queries * for the next subclause queries
*/ */
if (pCmd->clauseIndex < pCmd->numOfClause - 1) { if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return; return;
} }
/* /*
* 1. has reach the limitation * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore * 2. no remain virtual nodes to be retrieved anymore
*/ */
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE) {
// in case of show command, return no data
(*pSql->fetchFp)(param, pSql, 0); (*pSql->fetchFp)(param, pSql, 0);
} else {
assert(0);
} }
return;
} else { // current query is not completed, continue retrieve from node } else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
...@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) { ...@@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) {
taos_free_result(pSql); taos_free_result(pSql);
} }
void tscQueueAsyncFreeResult(SSqlObj *pSql) {
tscDebug("%p sqlObj put in queue to async free", pSql);
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncFree;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
......
...@@ -4452,6 +4452,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4452,6 +4452,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tVariantList* pVarList = pAlterSQL->varList; tVariantList* pVarList = pAlterSQL->varList;
tVariant* pTagName = &pVarList->a[0].pVar; tVariant* pTagName = &pVarList->a[0].pVar;
int16_t numOfTags = tscGetNumOfTags(pTableMeta);
SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER; SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER;
SSQLToken name = {.type = TK_STRING, .z = pTagName->pz, .n = pTagName->nLen}; SSQLToken name = {.type = TK_STRING, .z = pTagName->pz, .n = pTagName->nLen};
...@@ -4475,8 +4476,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4475,8 +4476,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
(pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) { (pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) {
return invalidSqlErrMsg(pQueryInfo->msg, msg14); return invalidSqlErrMsg(pQueryInfo->msg, msg14);
} }
int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE; int32_t schemaLen = sizeof(STColumn) * numOfTags;
int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + schemaLen + TSDB_EXTRA_PAYLOAD_SIZE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for alter table msg", pSql); tscError("%p failed to malloc for alter table msg", pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -4487,11 +4490,25 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4487,11 +4490,25 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->uid);
pUpdateMsg->colId = htons(pTagsSchema->colId); pUpdateMsg->colId = htons(pTagsSchema->colId);
pUpdateMsg->type = htons(pTagsSchema->type);
pUpdateMsg->bytes = htons(pTagsSchema->bytes);
pUpdateMsg->tversion = htons(pTableMeta->tversion); pUpdateMsg->tversion = htons(pTableMeta->tversion);
pUpdateMsg->numOfTags = htons(numOfTags);
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true); pUpdateMsg->schemaLen = htonl(schemaLen);
// the schema is located after the msg body, then followed by true tag value
char* d = pUpdateMsg->data;
SSchema* pTagCols = tscGetTableTagSchema(pTableMeta);
for (int i = 0; i < numOfTags; ++i) {
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pTagCols[i].colId);
pCol->bytes = htons(pTagCols[i].bytes);
pCol->type = pTagCols[i].type;
pCol->offset = 0;
d += sizeof(STColumn);
}
// copy the tag value to msg body
tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data + schemaLen, pTagsSchema->type, true);
int32_t len = 0; int32_t len = 0;
if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) { if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) {
...@@ -4502,7 +4519,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4502,7 +4519,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data
int32_t total = sizeof(SUpdateTableTagValMsg) + len; int32_t total = sizeof(SUpdateTableTagValMsg) + len + schemaLen;
pUpdateMsg->head.contLen = htonl(total); pUpdateMsg->head.contLen = htonl(total);
} else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) { } else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) {
......
...@@ -14,8 +14,8 @@ ...@@ -14,8 +14,8 @@
*/ */
#include "os.h" #include "os.h"
#include "qsqltype.h"
#include "tcache.h" #include "tcache.h"
#include "tcmdtype.h"
#include "trpc.h" #include "trpc.h"
#include "tscLocalMerge.h" #include "tscLocalMerge.h"
#include "tscLog.h" #include "tscLog.h"
......
...@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { ...@@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return taosArrayGetP(pColumnList, i); return taosArrayGetP(pColumnList, i);
} }
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) { static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) { for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) { if (pFilterInfo[i].filterstr) {
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_QSQLCMD_H #ifndef TDENGINE_TSQLMSGTYPE_H
#define TDENGINE_QSQLCMD_H #define TDENGINE_TSQLMSGTYPE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -109,4 +109,4 @@ extern char *sqlCmd[]; ...@@ -109,4 +109,4 @@ extern char *sqlCmd[];
} }
#endif #endif
#endif // TDENGINE_QSQLCMD_H #endif // TDENGINE_TSQLMSGTYPE_H
...@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema(); ...@@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema();
bool tscValidateTableNameLength(size_t len); bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
#endif // TDENGINE_NAME_H #endif // TDENGINE_NAME_H
...@@ -15,4 +15,4 @@ ...@@ -15,4 +15,4 @@
#define TSDB_SQL_C #define TSDB_SQL_C
#include "qsqltype.h" #include "tcmdtype.h"
...@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() { ...@@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() {
bool tscValidateTableNameLength(size_t len) { bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN; return len < TSDB_TABLE_NAME_LEN;
} }
\ No newline at end of file
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
return NULL;
}
SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo));
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
}
}
assert(src->filterstr == 0 || src->filterstr == 1);
assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID));
return pFilter;
}
...@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo); ...@@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo);
* @param qinfo * @param qinfo
* @return * @return
*/ */
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param); void qTableQuery(qinfo_t qinfo);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
......
...@@ -285,9 +285,9 @@ typedef struct { ...@@ -285,9 +285,9 @@ typedef struct {
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int16_t type;
int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[]; char data[];
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
......
...@@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo { ...@@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo {
} SSingleColumnFilterInfo; } SSingleColumnFilterInfo;
typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int32_t tableIndex;
int32_t groupIndex; // group id in table list
TSKEY lastKey; TSKEY lastKey;
int32_t numOfRes; int32_t groupIndex; // group id in table list
int16_t queryRangeSet; // denote if the query range is set, only available for interval query int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag; int64_t tag;
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
} STableQueryInfo; } STableQueryInfo;
...@@ -127,11 +124,6 @@ typedef struct SQueryCostInfo { ...@@ -127,11 +124,6 @@ typedef struct SQueryCostInfo {
uint64_t computTime; uint64_t computTime;
} SQueryCostInfo; } SQueryCostInfo;
//typedef struct SGroupItem {
// void *pTable;
// STableQueryInfo *info;
//} SGroupItem;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
...@@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv { ...@@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv {
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;
SQueryCostInfo summary; SQueryCostInfo summary;
bool stableQuery; // super table query or not
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file bool stableQuery; // super table query or not
bool topBotQuery; // false bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {
...@@ -205,7 +197,8 @@ typedef struct SQInfo { ...@@ -205,7 +197,8 @@ typedef struct SQInfo {
*/ */
int32_t tableIndex; int32_t tableIndex;
int32_t numOfGroupResultPages; int32_t numOfGroupResultPages;
_qinfo_free_fn_t fn; _qinfo_free_fn_t freeFn;
jmp_buf env;
} SQInfo; } SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "qfill.h" #include "qfill.h"
#include "hash.h" #include "hash.h"
...@@ -22,9 +23,8 @@ ...@@ -22,9 +23,8 @@
#include "qresultBuf.h" #include "qresultBuf.h"
#include "query.h" #include "query.h"
#include "queryLog.h" #include "queryLog.h"
#include "taosmsg.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscUtil.h" // todo move the function to common module #include "exception.h"
#include "tscompression.h" #include "tscompression.h"
#include "ttime.h" #include "ttime.h"
...@@ -87,6 +87,17 @@ typedef struct { ...@@ -87,6 +87,17 @@ typedef struct {
STSCursor cur; STSCursor cur;
} SQueryStatusInfo; } SQueryStatusInfo;
static UNUSED_FUNC void *u_malloc (size_t __size) {
// uint32_t v = rand();
// if (v % 5 <= 1) {
// return NULL;
// } else {
return malloc(__size);
// }
}
#define malloc u_malloc
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
...@@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
} }
int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
// int64_t maxOutput = 0;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
int32_t functionId = pQuery->pSelectExpr[j].base.functionId; int32_t functionId = pQuery->pSelectExpr[j].base.functionId;
...@@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { ...@@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) {
if (pResultInfo->numOfRes > 0) { if (pResultInfo->numOfRes > 0) {
return pResultInfo->numOfRes; return pResultInfo->numOfRes;
} }
// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) {
// maxOutput = pResultInfo->numOfRes;
//
// if (maxOutput > 0) {
// break;
// }
// }
//
// assert(pResultInfo != NULL);
} }
return 0; return 0;
...@@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
size_t size = taosArrayGetSize(pGroup); size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata; tFilePage **buffer = pQuery->sdata;
int32_t * posList = calloc(size, sizeof(int32_t));
int32_t* posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
if (pTableList == NULL || posList == NULL) {
tfree(posList);
tfree(pTableList);
qError("QInfo:%p failed alloc memory", pQInfo);
longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// todo opt for the case of one table per group // todo opt for the case of one table per group
int32_t numOfTables = 0; int32_t numOfTables = 0;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
...@@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { ...@@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
return pFillCol; return pFillCol;
} }
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery, void* freeParam, _qinfo_free_fn_t fn) { int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
...@@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
pQInfo->vgId = vgId; pQInfo->vgId = vgId;
pQInfo->param = freeParam;
pQInfo->fn = fn;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
pRuntimeEnv->pTSBuf = pTsBuf; pRuntimeEnv->pTSBuf = pTsBuf;
...@@ -4932,14 +4938,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { ...@@ -4932,14 +4938,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time // record the total elapsed time
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
} }
static void stableQueryImpl(SQInfo *pQInfo) { static void stableQueryImpl(SQInfo *pQInfo) {
...@@ -4961,10 +4959,6 @@ static void stableQueryImpl(SQInfo *pQInfo) { ...@@ -4961,10 +4959,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
// record the total elapsed time // record the total elapsed time
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st); pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
}
} }
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
...@@ -5076,6 +5070,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p ...@@ -5076,6 +5070,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p
*/ */
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr,
char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) { char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) {
int32_t code = TSDB_CODE_SUCCESS;
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
...@@ -5102,7 +5098,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5102,7 +5098,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// query msg safety check // query msg safety check
if (!validateQueryMsg(pQueryMsg)) { if (!validateQueryMsg(pQueryMsg)) {
return TSDB_CODE_QRY_INVALID_MSG; code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
} }
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
...@@ -5174,7 +5171,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5174,7 +5171,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int16_t functionId = pExprMsg->functionId; int16_t functionId = pExprMsg->functionId;
if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) { if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) {
if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression. if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression.
return TSDB_CODE_QRY_INVALID_MSG; code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
} }
} else { } else {
// if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { // if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) {
...@@ -5186,6 +5184,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5186,6 +5184,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
} }
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
code = TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup; goto _cleanup;
} }
...@@ -5193,6 +5192,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5193,6 +5192,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
*groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex)); *groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex));
if (*groupbyCols == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) {
(*groupbyCols)[i].colId = *(int16_t *)pMsg; (*groupbyCols)[i].colId = *(int16_t *)pMsg;
...@@ -5248,7 +5251,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5248,7 +5251,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (*pMsg != 0) { if (*pMsg != 0) {
size_t len = strlen(pMsg) + 1; size_t len = strlen(pMsg) + 1;
*tbnameCond = malloc(len); *tbnameCond = malloc(len);
if (*tbnameCond == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
strcpy(*tbnameCond, pMsg); strcpy(*tbnameCond, pMsg);
pMsg += len; pMsg += len;
} }
...@@ -5258,7 +5267,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5258,7 +5267,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
return 0;
return TSDB_CODE_SUCCESS;
_cleanup: _cleanup:
tfree(*pExpr); tfree(*pExpr);
...@@ -5268,7 +5278,8 @@ _cleanup: ...@@ -5268,7 +5278,8 @@ _cleanup:
tfree(*groupbyCols); tfree(*groupbyCols);
tfree(*tagCols); tfree(*tagCols);
tfree(*tagCond); tfree(*tagCond);
return TSDB_CODE_QRY_INVALID_MSG;
return code;
} }
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
...@@ -5656,7 +5667,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5656,7 +5667,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window);
item->groupIndex = i; item->groupIndex = i;
item->tableIndex = tableIndex++;
taosArrayPush(p1, &item); taosArrayPush(p1, &item);
taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
} }
...@@ -5670,7 +5680,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5670,7 +5680,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQuery->window = pQueryMsg->window; pQuery->window = pQueryMsg->window;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); int32_t code = TAOS_SYSTEM_ERROR(errno);
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, tstrerror(code));
goto _cleanup; goto _cleanup;
} }
...@@ -5681,7 +5692,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5681,7 +5692,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
_cleanup: _cleanup:
freeQInfo(pQInfo); freeQInfo(pQInfo);
return NULL; return NULL;
} }
...@@ -5723,6 +5733,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5723,6 +5733,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pQInfo->param = param;
pQInfo->freeFn = fn;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
...@@ -5732,7 +5745,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ ...@@ -5732,7 +5745,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
} }
// filter the qualified // filter the qualified
if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable, param, fn)) != TSDB_CODE_SUCCESS) { if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -6032,19 +6045,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) { ...@@ -6032,19 +6045,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) {
qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref);
if (ref == 0) { if (ref == 0) {
_qinfo_free_fn_t fn = pQInfo->fn; _qinfo_free_fn_t freeFp = pQInfo->freeFn;
void* param = pQInfo->param; void* param = pQInfo->param;
doDestoryQueryInfo(pQInfo); doDestoryQueryInfo(pQInfo);
if (fn != NULL) { if (freeFp != NULL) {
assert(param != NULL); assert(param != NULL);
fn(param); freeFp(param);
} }
} }
} }
void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { void qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) {
...@@ -6054,17 +6067,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { ...@@ -6054,17 +6067,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo); qDebug("QInfo:%p it is already killed, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
return; return;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo); qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
int32_t ret = setjmp(pQInfo->env);
// error occurs, record the error code and return to client
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return; return;
} }
qDebug("QInfo:%p query task is launched", pQInfo); qDebug("QInfo:%p query task is launched", pQInfo);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
assert(pQInfo->runtimeEnv.pQueryHandle == NULL); assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
buildTagQueryResult(pQInfo); // todo support the limit/offset buildTagQueryResult(pQInfo); // todo support the limit/offset
...@@ -6074,6 +6104,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { ...@@ -6074,6 +6104,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
tableQueryImpl(pQInfo); tableQueryImpl(pQInfo);
} }
SQuery* pQuery = pRuntimeEnv->pQuery;
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:%p query is killed", pQInfo);
} else if (pQuery->rec.rows == 0) {
qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total);
} else {
qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
} }
......
...@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { ...@@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
size_t len = strlen(cond) + VARSTR_HEADER_SIZE; size_t len = strlen(cond) + VARSTR_HEADER_SIZE;
char* p = exception_malloc(len); char* p = exception_malloc(len);
varDataSetLen(p, len - VARSTR_HEADER_SIZE); STR_WITH_SIZE_TO_VARSTR(p, cond, len - VARSTR_HEADER_SIZE);
memcpy(varDataVal(p), cond, len);
taosArrayPush(pVal->arr, &p); taosArrayPush(pVal->arr, &p);
} }
......
...@@ -15,16 +15,16 @@ ...@@ -15,16 +15,16 @@
#include "os.h" #include "os.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "queryLog.h"
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcmdtype.h"
#include "tglobal.h" #include "tglobal.h"
#include "tstoken.h" #include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h" #include "ttime.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tutil.h" #include "tutil.h"
#include "qsqltype.h"
#include "tstrbuild.h"
#include "queryLog.h"
SSqlInfo qSQLParse(const char *pStr) { SSqlInfo qSQLParse(const char *pStr) {
void *pParser = ParseAlloc(malloc); void *pParser = ParseAlloc(malloc);
......
...@@ -25,17 +25,17 @@ ...@@ -25,17 +25,17 @@
#include <stdio.h> #include <stdio.h>
/************ Begin %include sections from the grammar ************************/ /************ Begin %include sections from the grammar ************************/
#include <assert.h>
#include <stdbool.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "tutil.h"
#include "qsqlparser.h" #include "qsqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h" #include "tstoken.h"
#include "tvariant.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "qsqltype.h" #include "tutil.h"
#include "tvariant.h"
/**************** End of %include directives **********************************/ /**************** End of %include directives **********************************/
/* These constants specify the various numeric values for terminal symbols /* These constants specify the various numeric values for terminal symbols
** in a format understandable to "makeheaders". This section is blank unless ** in a format understandable to "makeheaders". This section is blank unless
......
...@@ -255,17 +255,46 @@ _err: ...@@ -255,17 +255,46 @@ _err:
return NULL; return NULL;
} }
static int32_t colIdCompar(const void* left, const void* right) {
int16_t colId = *(int16_t*) left;
STColumn* p2 = (STColumn*) right;
if (colId == p2->colId) {
return 0;
}
return (colId < p2->colId)? -1:1;
}
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
int16_t tversion = htons(pMsg->tversion);
STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); pMsg->uid = htobe64(pMsg->uid);
pMsg->tid = htonl(pMsg->tid);
pMsg->tversion = htons(pMsg->tversion);
pMsg->colId = htons(pMsg->colId);
pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen);
assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags);
char* d = pMsg->data;
for(int32_t i = 0; i < pMsg->numOfTags; ++i) {
STColumn* pCol = (STColumn*) d;
pCol->colId = htons(pCol->colId);
pCol->bytes = htonl(pCol->bytes);
assert(pCol->offset == 0);
d += sizeof(STColumn);
}
STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid);
if (pTable == NULL) { if (pTable == NULL) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
if (TABLE_TID(pTable) != htonl(pMsg->tid)) { if (TABLE_TID(pTable) != pMsg->tid) {
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return -1; return -1;
} }
...@@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
return -1; return -1;
} }
if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) { if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) {
tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo),
schemaVersion(tsdbGetTableTagSchema(pTable)), tversion); schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion);
void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid);
if (msg == NULL) return -1; if (msg == NULL) return -1;
// Deal with error her // Deal with error her
...@@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { ...@@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); STSchema *pTagSchema = tsdbGetTableTagSchema(pTable);
if (schemaVersion(pTagSchema) > tversion) { if (schemaVersion(pTagSchema) > pMsg->tversion) {
tsdbError( tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d", "version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tversion, schemaVersion(pTable->tagSchema)); REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
} }
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
tsdbRemoveTableFromIndex(pMeta, pTable); tsdbRemoveTableFromIndex(pMeta, pTable);
} }
// TODO: remove table from index if it is the first column of tag // TODO: remove table from index if it is the first column of tag
tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { // TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId
STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar);
assert(res != NULL);
tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen);
if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) {
tsdbAddTableIntoIndex(pMeta, pTable); tsdbAddTableIntoIndex(pMeta, pTable);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -74,9 +74,6 @@ typedef struct STableCheckInfo { ...@@ -74,9 +74,6 @@ typedef struct STableCheckInfo {
SDataCols* pDataCols; SDataCols* pDataCols;
int32_t chosen; // indicate which iterator should move forward int32_t chosen; // indicate which iterator should move forward
bool initBuf; // whether to initialize the in-memory skip list iterator or not bool initBuf; // whether to initialize the in-memory skip list iterator or not
SMemTable* mem; // in-mem buffer, hold the ref count
SMemTable* imem; // imem buffer, hold the ref count to avoid release
SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iter; // mem buffer skip list iterator
SSkipListIterator* iiter; // imem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator
} STableCheckInfo; } STableCheckInfo;
...@@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle { ...@@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle {
SFileGroupIter fileIter; SFileGroupIter fileIter;
SRWHelper rhelper; SRWHelper rhelper;
STableBlockInfo* pDataBlockInfo; STableBlockInfo* pDataBlockInfo;
SMemTable* mem; // mem-table
SMemTable* imem; // imem-table, acquired from snapshot
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
...@@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { ...@@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) {
} }
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order; pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow; pQueryHandle->window = pCond->twindow;
...@@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
...@@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
pCheckInfo->initBuf = true; pCheckInfo->initBuf = true;
int32_t order = pHandle->order; int32_t order = pHandle->order;
tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem); // tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem);
// no data in buffer, abort // no data in buffer, abort
if (pCheckInfo->mem == NULL && pCheckInfo->imem == NULL) { if (pHandle->mem == NULL && pHandle->imem == NULL) {
return false; return false;
} }
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
if (pCheckInfo->mem && pCheckInfo->mem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pCheckInfo->mem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
if (pCheckInfo->imem && pCheckInfo->imem->tData[pCheckInfo->tableId.tid] != NULL) { if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pCheckInfo->imem->tData[pCheckInfo->tableId.tid]->pData, pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
} }
...@@ -2319,9 +2316,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2319,9 +2316,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter); tSkipListDestroyIter(pTableCheckInfo->iter);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);
if (pTableCheckInfo->pDataCols != NULL) { if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf); tfree(pTableCheckInfo->pDataCols->buf);
} }
...@@ -2341,9 +2335,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2341,9 +2335,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosArrayDestroy(pQueryHandle->pColumns); taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis); tfree(pQueryHandle->statis);
// todo check error
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
tsdbDestroyHelper(&pQueryHandle->rhelper); tsdbDestroyHelper(&pQueryHandle->rhelper);
tfree(pQueryHandle); tfree(pQueryHandle);
} }
......
...@@ -555,7 +555,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { ...@@ -555,7 +555,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) {
pCacheObj->freeFp(pElem->pData->data); pCacheObj->freeFp(pElem->pData->data);
} }
uError("free obj:%p", pElem->pData);
free(pElem->pData); free(pElem->pData);
free(pElem); free(pElem);
} }
......
...@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) {
assert(refCount >= 0); assert(refCount >= 0);
if (refCount > 0) { if (refCount > 0) {
vTrace("vgId:%d, release vnode, refCount:%d", vgId, refCount); vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
return; return;
} }
...@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) { ...@@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
if (pVnode == NULL) return pVnode; if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
return pVnode; return pVnode;
} }
...@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { ...@@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
// release local resources only after cutting off outside connections // release local resources only after cutting off outside connections
taosCacheCleanup(pVnode->qHandlePool);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// this message arrived here by means of the *query* message, so release the vnode is necessary // this message arrived here by means of the *query* message, so release the vnode is necessary
void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
if (qhandle == NULL || *qhandle == NULL) { // todo handle invalid qhandle error if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
} else { } else {
// qKillQuery((qinfo_t) killQueryMsg->qhandle);
taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
} }
vnodeRelease(pVnode); vnodeRelease(pVnode);
return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code return TSDB_CODE_TSC_QUERY_CANCELLED;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->code = code;
pRsp->code = code; pRsp->qhandle = 0;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
...@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
} else { } else {
assert(pQInfo == NULL); assert(pQInfo == NULL);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
pQInfo = pCont; pQInfo = *(void**)(pCont);
handle = pCont;
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query qTableQuery(pQInfo); // do execute query
assert(handle != NULL);
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
} }
...@@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void **pQInfo = (void*) htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t ret = 0; int32_t ret = 0;
void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, &pQInfo, sizeof(pQInfo)); void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
if (handle == NULL || *handle != pQInfo) { if (handle == NULL || handle != pQInfo) {
ret = TSDB_CODE_QRY_INVALID_QHANDLE; ret = TSDB_CODE_QRY_INVALID_QHANDLE;
} }
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); if (ret == TSDB_CODE_SUCCESS) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
taosCacheRelease(pVnode->qHandlePool, handle, true);
// int32_t ret = qKillQuery(pQInfo);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
pRet->len = sizeof(SRetrieveTableRsp); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp; SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0; pRsp->numOfRows = 0;
pRsp->completed = true; pRsp->completed = true;
pRsp->useconds = 0; pRsp->useconds = 0;
} else { // todo handle error
}
return ret; return ret;
} }
vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
int32_t code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(*pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(*pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = handle;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
// qDestroyQueryInfo(pQInfo);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册