提交 87969990 编写于 作者: H Haojun Liao

[td-225]

上级 0ee8b40f
......@@ -294,6 +294,8 @@ uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -35,6 +35,7 @@ extern "C" {
#include "qExecutor.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qUtil.h"
#include "tcmdtype.h"
// forward declaration
......@@ -96,21 +97,6 @@ typedef struct STableMetaInfo {
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
} SSqlExpr;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
......@@ -290,7 +276,7 @@ typedef struct {
char * pRsp;
int32_t rspType;
int32_t rspLen;
uint64_t qhandle;
uint64_t qid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int32_t row;
......
......@@ -161,8 +161,8 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qhandle == 0 && numOfRows != 0) {
if ((pRes->qid == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qid == 0 && numOfRows != 0) {
tscError("qhandle is NULL");
} else {
pRes->code = numOfRows;
......@@ -210,10 +210,9 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
if (pRes->qid == 0) {
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
return;
}
......
......@@ -309,7 +309,7 @@ TAOS_ROW tscFetchRow(void *param) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 ||
if (pRes->qid == 0 ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
return NULL;
......@@ -905,7 +905,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
* free allocated resources and remove the SqlObj from sql query linked list
*/
pRes->qhandle = 0x1;
pRes->qid = 0x1;
pRes->numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_SHOW_CREATE_TABLE) {
pRes->code = tscProcessShowCreateTable(pSql);
......
......@@ -56,82 +56,83 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescriptor *pDesc) {
/*
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// todo merge with vnode side function
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->pOutput = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pExpr->functionId;
pCtx[i].order = pQueryInfo->order.order;
pCtx[i].functionId = pExpr->functionId;
// input buffer hold only one point data
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
SSchema *pSchema = getColumnModelSchema(pDesc->pColumnModel, i);
pCtx->pInput = pReducer->pTempBuffer->data + offset;
pCtx[i].inputType = pExpr->colType;
pCtx[i].inputBytes = pExpr->colBytes;
// input data format comes from pModel
pCtx->inputType = pSchema->type;
pCtx->inputBytes = pSchema->bytes;
pCtx[i].outputBytes = pExpr->resBytes;
pCtx[i].outputType = pExpr->resType;
// output data format yet comes from pCmd.
pCtx->outputBytes = pExpr->resBytes;
pCtx->outputType = pExpr->resType;
pCtx->size = 1;
pCtx->hasNull = true;
pCtx->currentStage = MERGE_STAGE;
pCtx[i].size = 1;
pCtx[i].hasNull = true;
pCtx[i].currentStage = MERGE_STAGE;
// for top/bottom function, the output of timestamp is the first column
int32_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pCtx->ptsOutputBuf = pReducer->pCtx[0].pOutput;
pCtx->param[2].i64 = pQueryInfo->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i64 = pQueryInfo->order.orderColId;
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
pCtx[i].param[2].i64 = pQueryInfo->order.order;
pCtx[i].param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx[i].param[1].i64 = pQueryInfo->order.orderColId;
} else if (functionId == TSDB_FUNC_APERCT) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx[i].param[0].i64 = pExpr->param[0].i64;
pCtx[i].param[0].nType = pExpr->param[0].nType;
} else if (functionId == TSDB_FUNC_BLKINFO) {
pCtx->param[0].i64 = pExpr->param[0].i64;
pCtx->param[0].nType = pExpr->param[0].nType;
pCtx->numOfParams = 1;
pCtx[i].param[0].i64 = pExpr->param[0].i64;
pCtx[i].param[0].nType = pExpr->param[0].nType;
pCtx[i].numOfParams = 1;
}
pCtx->interBufBytes = pExpr->interBytes;
pCtx->resultInfo = calloc(1, pCtx->interBufBytes + sizeof(SResultRowCellInfo));
pCtx->stableQuery = true;
pCtx[i].interBufBytes = pExpr->interBytes;
pCtx[i].resultInfo = calloc(1, pCtx[i].interBufBytes + sizeof(SResultRowCellInfo));
pCtx[i].stableQuery = true;
}
int16_t n = 0;
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
SQLFunctionCtx *pCtx = NULL;
SQLFunctionCtx *pCtx1 = NULL;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->resBytes;
pTagCtx[n++] = &pReducer->pCtx[i];
pTagCtx[n++] = &pCtx[i];
} else if ((aAggs[pExpr->functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
pCtx = &pReducer->pCtx[i];
pCtx1 = &pCtx[i];
}
}
if (n == 0 || pCtx == NULL) {
free(pTagCtx);
} else {
pCtx->tagInfo.pTagCtxList = pTagCtx;
pCtx->tagInfo.numOfTagCols = n;
pCtx->tagInfo.tagsLen = tagLen;
pCtx1->tagInfo.pTagCtxList = pTagCtx;
pCtx1->tagInfo.numOfTagCols = n;
pCtx1->tagInfo.tagsLen = tagLen;
}
}
static void setCtxInputOutputBuffer(SQueryInfo* pQueryInfo, SQLFunctionCtx *pCtx, SLocalMerger *pReducer,
tOrderDescriptor *pDesc) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx[i].pOutput = pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
// input buffer hold only one point data
int16_t offset = getColumnModelOffset(pDesc->pColumnModel, i);
pCtx[i].pInput = pReducer->pTempBuffer->data + offset;
}
}
......@@ -362,8 +363,8 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
pReducer->pTempBuffer->num = 0;
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pReducer, pDesc);
tsCreateSQLFunctionCtx(pQueryInfo, pReducer->pCtx);
setCtxInputOutputBuffer(pQueryInfo, pReducer->pCtx, pReducer, pDesc);
// we change the capacity of schema to denote that there is only one row in temp buffer
pReducer->pDesc->pColumnModel->capacity = 1;
......@@ -1607,7 +1608,7 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
tscDestroyLocalMerger(pObj);
}
pRes->qhandle = 1; // hack to pass the safety check in fetch_row function
pRes->qid = 1; // hack to pass the safety check in fetch_row function
pRes->numOfRows = 0;
pRes->row = 0;
......
......@@ -903,7 +903,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pRes->qhandle = 0;
pRes->qid = 0;
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
......
......@@ -250,7 +250,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pQdesc->queryId = htonl(pSql->queryId);
//pQdesc->useconds = htobe64(pSql->res.useconds);
pQdesc->useconds = htobe64(now - pSql->stime);
pQdesc->qHandle = htobe64(pSql->res.qhandle);
pQdesc->qid = htobe64(pSql->res.qid);
pHeartbeat->numOfQueries++;
pQdesc++;
......
......@@ -1490,9 +1490,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
SExprInfo* pArithExprInfo = calloc(1, sizeof(SExprInfo));
// arithmetic expression always return result in the format of double float
pArithExprInfo->bytes = sizeof(double);
pArithExprInfo->interBytes = sizeof(double);
pArithExprInfo->type = TSDB_DATA_TYPE_DOUBLE;
pArithExprInfo->base.resBytes = sizeof(double);
pArithExprInfo->base.interBytes = sizeof(double);
pArithExprInfo->base.resType = TSDB_DATA_TYPE_DOUBLE;
pArithExprInfo->base.functionId = TSDB_FUNC_ARITHM;
pArithExprInfo->base.numOfParams = 1;
......@@ -1517,10 +1517,10 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
// TODO: other error handling
} END_TRY
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
pFuncMsg->arg[0].argBytes = (int16_t) tbufTell(&bw);
pFuncMsg->arg[0].argValue.pz = tbufGetData(&bw, true);
pFuncMsg->arg[0].argType = TSDB_DATA_TYPE_BINARY;
SSqlExpr* pFuncMsg = &pInfo->pArithExprInfo->base;
pFuncMsg->param[0].nLen = (int16_t) tbufTell(&bw);
pFuncMsg->param[0].pz = tbufGetData(&bw, true);
pFuncMsg->param[0].nType = TSDB_DATA_TYPE_BINARY;
// tbufCloseWriter(&bw); // TODO there is a memory leak
}
......@@ -6631,6 +6631,10 @@ static STableMeta* extractTempTableMetaFromNestQuery(SQueryInfo* pUpstream) {
return meta;
}
//static SColumnInfo* getColumnInfoFromSchema(SQueryInfo* pUpstream) {
//
//}
int32_t validateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) {
assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0));
......
......@@ -510,8 +510,9 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd);
pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qid = htobe64(pSql->res.qid);
// todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -523,7 +524,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qid:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qid);
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
......@@ -531,12 +532,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qid:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qid);
}
} else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle);
tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qid:%" PRIu64, pSql, pTableMeta->vgId, pSql->res.qid);
}
pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
......@@ -592,7 +593,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlFuncMsg) * numOfExprs * 2);
int32_t exprSize = (int32_t)(sizeof(SSqlExpr) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
int32_t sqlLen = (int32_t) strlen(pSql->sqlstr) + 1;
......@@ -818,7 +819,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
SSqlFuncMsg *pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
SSqlExpr *pSqlFuncExpr = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -840,43 +842,41 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->colInfo.colIndex = htons(pExpr->colInfo.colIndex);
pSqlFuncExpr->colInfo.flag = htons(pExpr->colInfo.flag);
if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag)) {
pSqlFuncExpr->colType = htons(pExpr->resType);
pSqlFuncExpr->colBytes = htons(pExpr->resBytes);
} else if (pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
SSchema *s = tGetTbnameColumnSchema();
pSqlFuncExpr->colType = htons(pExpr->colType);
pSqlFuncExpr->colBytes = htons(pExpr->colBytes);
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
if (TSDB_COL_IS_UD_COL(pExpr->colInfo.flag) || pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
pSqlFuncExpr->resType = htons(pExpr->resType);
pSqlFuncExpr->resBytes = htons(pExpr->resBytes);
} else if (pExpr->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pSqlFuncExpr->colType = htons(s.type);
pSqlFuncExpr->colBytes = htons(s.bytes);
pSqlFuncExpr->resType = htons(s.type);
pSqlFuncExpr->resBytes = htons(s.bytes);
} else {
SSchema* s = tscGetColumnSchemaById(pTableMeta, pExpr->colInfo.colId);
pSqlFuncExpr->colType = htons(s->type);
pSqlFuncExpr->colBytes = htons(s->bytes);
pSqlFuncExpr->resType = htons(s->type);
pSqlFuncExpr->resBytes = htons(s->bytes);
}
pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId);
pMsg += sizeof(SSqlFuncMsg);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);
pSqlFuncExpr->param[j].nType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->param[j].nLen = htons(pExpr->param[j].nLen);
if (pExpr->param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExpr->param[j].pz, pExpr->param[j].nLen);
pMsg += pExpr->param[j].nLen;
} else {
pSqlFuncExpr->arg[j].argValue.i64 = htobe64(pExpr->param[j].i64);
pSqlFuncExpr->param[j].i64 = htobe64(pExpr->param[j].i64);
}
}
pSqlFuncExpr = (SSqlFuncMsg *)pMsg;
pSqlFuncExpr = (SSqlExpr *)pMsg;
}
size_t output = tscNumOfFields(pQueryInfo);
......@@ -884,7 +884,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (tscIsSecondStageQuery(pQueryInfo)) {
pQueryMsg->secondStageOutput = htonl((int32_t) output);
SSqlFuncMsg *pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
SSqlExpr *pExpr1 = (SSqlExpr *)pMsg;
for (int32_t i = 0; i < output; ++i) {
SInternalField* pField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
......@@ -904,49 +904,56 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pSqlFuncExpr1->numOfParams = 0; // no params for projection query
pSqlFuncExpr1->functionId = htons(TSDB_FUNC_PRJ);
pSqlFuncExpr1->colInfo.colId = htons(pExpr->resColId);
pSqlFuncExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
pExpr1->numOfParams = 0; // no params for projection query
pExpr1->functionId = htons(TSDB_FUNC_PRJ);
pExpr1->colInfo.colId = htons(pExpr->resColId);
pExpr1->colInfo.flag = htons(TSDB_COL_NORMAL);
pExpr1->colType = htons(pExpr->resType);
pExpr1->colBytes = htons(pExpr->resBytes);
bool assign = false;
for (int32_t f = 0; f < tscSqlExprNumOfExprs(pQueryInfo); ++f) {
SSqlExpr *pe = tscSqlExprGet(pQueryInfo, f);
if (pe == pExpr) {
pSqlFuncExpr1->colInfo.colIndex = htons(f);
pSqlFuncExpr1->colType = htons(pe->resType);
pSqlFuncExpr1->colBytes = htons(pe->resBytes);
pExpr1->colInfo.colIndex = htons(f);
pExpr1->resType = htons(pe->resType);
pExpr1->resBytes = htons(pe->resBytes);
assign = true;
break;
}
}
assert(assign);
pMsg += sizeof(SSqlFuncMsg);
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
pMsg += sizeof(SSqlExpr);
pExpr1 = (SSqlExpr *)pMsg;
} else {
assert(pField->pArithExprInfo != NULL);
SExprInfo* pExprInfo = pField->pArithExprInfo;
pSqlFuncExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId);
pSqlFuncExpr1->functionId = htons(pExprInfo->base.functionId);
pSqlFuncExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
pMsg += sizeof(SSqlFuncMsg);
pExpr1->colInfo.colId = htons(pExprInfo->base.colInfo.colId);
pExpr1->colType = htons(pExprInfo->base.colType);
pExpr1->colBytes = htons(pExprInfo->base.colBytes);
pExpr1->resBytes = htons(sizeof(double));
pExpr1->resType = htons(TSDB_DATA_TYPE_DOUBLE);
pExpr1->functionId = htons(pExprInfo->base.functionId);
pExpr1->numOfParams = htons(pExprInfo->base.numOfParams);
pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
// todo add log
pSqlFuncExpr1->arg[j].argType = htons((uint16_t)pExprInfo->base.arg[j].argType);
pSqlFuncExpr1->arg[j].argBytes = htons(pExprInfo->base.arg[j].argBytes);
pExpr1->param[j].nType = htons((uint16_t)pExprInfo->base.param[j].nType);
pExpr1->param[j].nLen = htons(pExprInfo->base.param[j].nLen);
if (pExprInfo->base.arg[j].argType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExprInfo->base.arg[j].argValue.pz, pExprInfo->base.arg[j].argBytes);
pMsg += pExprInfo->base.arg[j].argBytes;
if (pExprInfo->base.param[j].nType == TSDB_DATA_TYPE_BINARY) {
memcpy(pMsg, pExprInfo->base.param[j].pz, pExprInfo->base.param[j].nLen);
pMsg += pExprInfo->base.param[j].nLen;
} else {
pSqlFuncExpr1->arg[j].argValue.i64 = htobe64(pExprInfo->base.arg[j].argValue.i64);
pExpr1->param[j].i64 = htobe64(pExprInfo->base.param[j].i64);
}
}
pSqlFuncExpr1 = (SSqlFuncMsg *)pMsg;
pExpr1 = (SSqlExpr *)pMsg;
}
}
} else {
......@@ -1561,7 +1568,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pRetrieveMsg->qid = htobe64(pSql->res.qid);
pRetrieveMsg->free = htons(pQueryInfo->type);
return TSDB_CODE_SUCCESS;
......@@ -2107,7 +2114,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
pShow = (SShowRsp *)pRes->pRsp;
pShow->qhandle = htobe64(pShow->qhandle);
pRes->qhandle = pShow->qhandle;
pRes->qid = pShow->qhandle;
tscResetForNextRetrieve(pRes);
pMetaMsg = &(pShow->tableMeta);
......@@ -2289,10 +2296,11 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryTableRsp *pQuery = (SQueryTableRsp *)pRes->pRsp;
pQuery->qhandle = htobe64(pQuery->qhandle);
pRes->qhandle = pQuery->qhandle;
pQuery->qid = htobe64(pQuery->qid);
pRes->qid = pQuery->qid;
pRes->data = NULL;
tscResetForNextRetrieve(pRes);
return 0;
}
......
......@@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 ||
if (pRes->qid == 0 ||
pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
......@@ -508,7 +508,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 ||
if (pRes->qid == 0 ||
pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) {
......@@ -554,7 +554,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (pRes == NULL || pRes->qhandle == 0) {
if (pRes == NULL || pRes->qid == 0) {
return true;
}
......@@ -1050,7 +1050,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qhandle = 0;
pRes->qid = 0;
free(str);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -149,7 +149,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
}
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0;
pRes->qid = 0;
pRes->numOfRows = 1;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
......@@ -448,7 +448,7 @@ SSqlObj* recreateSqlObj(SSub* pSub) {
return NULL;
}
pRes->qhandle = 0;
pRes->qid = 0;
pRes->numOfRows = 1;
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
......@@ -546,7 +546,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
uint32_t type = pQueryInfo->type;
tscFreeSqlResult(pSql);
pRes->numOfRows = 1;
pRes->qhandle = 0;
pRes->qid = 0;
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
......
......@@ -1584,7 +1584,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pSql->res.qhandle = 0x1;
pSql->res.qid = 0x1;
assert(pSql->res.numOfRows == 0);
if (pSql->pSubs == NULL) {
......@@ -2179,7 +2179,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SColumnModel *pModel = NULL;
SColumnModel *pFinalModel = NULL;
pRes->qhandle = 0x1; // hack the qhandle check
pRes->qid = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 16u); // 64KB
......@@ -2727,7 +2727,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
if (pSql->res.qid == 0) { // qhandle is 0, code is TSDB_CODE_SUCCESS means no results generated from this vnode
tscRetrieveFromDnodeCallBack(param, pSql, 0);
} else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
......
......@@ -371,11 +371,39 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
}
}
static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* pTableCols) {
int32_t numOfCols = taosArrayGetSize(pTableCols);
SColumnInfo* pColInfo = calloc(numOfCols, sizeof(SColumnInfo));
SSchema *pSchema = pTableMeta->schema;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pTableCols, i);
int32_t index = pCol->colIndex.columnIndex;
pColInfo[i].type = pSchema[index].type;
pColInfo[i].bytes = pSchema[index].bytes;
pColInfo[i].colId = pSchema[index].colId;
}
return pColInfo;
}
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
if (pQueryInfo->pDownstream != NULL && taosArrayGetSize(pQueryInfo->pDownstream) > 0) {
// handle the following query process
SQueryInfo* px = taosArrayGetP(pQueryInfo->pDownstream, 0);
printf("%d\n", px->type);
SColumnInfo* colInfo = extractColumnInfoFromResult(px->pTableMetaInfo[0]->pTableMeta, px->colList);
int32_t numOfOutput = tscSqlExprNumOfExprs(px);
SExprInfo *exprInfo = NULL;
SQLFunctionCtx *pCtx = calloc(numOfOutput, sizeof(SQLFunctionCtx));
int32_t numOfCols = taosArrayGetSize(px->colList);
SQueriedTableInfo info = {.colList = colInfo, .numOfCols = numOfCols,};
/*int32_t code = */createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
tsCreateSQLFunctionCtx(px, pCtx);
}
}
......@@ -1095,11 +1123,9 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(pInfo->pArithExprInfo->pExpr, NULL);
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
SSqlExpr* pFuncMsg = &pInfo->pArithExprInfo->base;
for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) {
if (pFuncMsg->arg[j].argType == TSDB_DATA_TYPE_BINARY) {
tfree(pFuncMsg->arg[j].argValue.pz);
}
tVariantDestroy(&pFuncMsg->param[j]);
}
tfree(pInfo->pArithExprInfo);
......@@ -1125,20 +1151,33 @@ static SSqlExpr* doCreateSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCo
// set the correct columnIndex index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema* s = tGetTbnameColumnSchema();
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
pExpr->colBytes = s->bytes;
pExpr->colType = s->type;
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pExpr->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
pExpr->colBytes = s.bytes;
pExpr->colType = s.type;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
pExpr->colInfo.colId = pColIndex->columnIndex;
pExpr->colBytes = size;
pExpr->colType = type;
} else {
if (TSDB_COL_IS_TAG(colType)) {
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
pExpr->colInfo.colId = pSchema[pColIndex->columnIndex].colId;
pExpr->colBytes = pSchema[pColIndex->columnIndex].bytes;
pExpr->colType = pSchema[pColIndex->columnIndex].type;
tstrncpy(pExpr->colInfo.name, pSchema[pColIndex->columnIndex].name, sizeof(pExpr->colInfo.name));
} else if (pTableMetaInfo->pTableMeta != NULL) {
// in handling select database/version/server_status(), the pTableMeta is NULL
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pColIndex->columnIndex);
pExpr->colInfo.colId = pSchema->colId;
pExpr->colBytes = pSchema->bytes;
pExpr->colType = pSchema->type;
tstrncpy(pExpr->colInfo.name, pSchema->name, sizeof(pExpr->colInfo.name));
}
}
......@@ -1769,7 +1808,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId = -1000;
pQueryInfo->resColumnId = TSDB_RES_COL_ID;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
......@@ -2128,6 +2167,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pNewQueryInfo->command = pQueryInfo->command;
pnCmd->active = pNewQueryInfo;
memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval));
pNewQueryInfo->type = pQueryInfo->type;
pNewQueryInfo->window = pQueryInfo->window;
......
......@@ -41,6 +41,33 @@ typedef struct SResPair {
double avg;
} SResPair;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t colType; // table column type
int16_t colBytes; // table column bytes
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
} SSqlExpr;
typedef struct SExprInfo {
SSqlExpr base;
int64_t uid;
struct tExprNode* pExpr;
} SExprInfo;
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
......
......@@ -243,8 +243,9 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-100)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......
......@@ -398,35 +398,26 @@ typedef struct SColIndex {
} SColIndex;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
} SSqlFuncMsg;
typedef struct SExprInfo {
SSqlFuncMsg base;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
} SExprInfo;
* operations in select */
//typedef struct SSqlFuncMsg {
// int16_t functionId;
// int16_t numOfParams;
//
// int16_t resColId; // result column id, id of the current output column
// int16_t colType;
// int16_t colBytes;
//
// SColIndex colInfo;
// struct ArgElem {
// int16_t argType;
// int16_t argBytes;
// union {
// double d;
// int64_t i64;
// char * pz;
// } argValue;
// } arg[3];
//} SSqlFuncMsg;
typedef struct SColumnFilterInfo {
int16_t lowerRelOptr;
......@@ -513,12 +504,12 @@ typedef struct {
typedef struct {
int32_t code;
uint64_t qhandle; // query handle
uint64_t qid; // dnode generated query id
} SQueryTableRsp;
typedef struct {
SMsgHead header;
uint64_t qhandle;
union{uint64_t qid; uint64_t qhandle;};
uint16_t free;
} SRetrieveTableMsg;
......@@ -818,7 +809,7 @@ typedef struct {
uint32_t queryId;
int64_t useconds;
int64_t stime;
uint64_t qHandle;
uint64_t qid;
} SQueryDesc;
typedef struct {
......
......@@ -344,7 +344,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->bytes[cols] = 24;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "qhandle");
strcpy(pSchema[cols].name, "qid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -420,7 +420,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
cols++;
char handleBuf[24] = {0};
snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle));
snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qid));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
......
......@@ -306,6 +306,12 @@ enum {
QUERY_RESULT_READY = 2,
};
typedef struct {
int32_t numOfTags;
int32_t numOfCols;
SColumnInfo *colList;
} SQueriedTableInfo;
typedef struct SQInfo {
void* signature;
uint64_t qId;
......@@ -331,8 +337,8 @@ typedef struct SQueryParam {
char *tbnameCond;
char *prevResult;
SArray *pTableIdList;
SSqlFuncMsg **pExprMsg;
SSqlFuncMsg **pSecExprMsg;
SSqlExpr **pExpr;
SSqlExpr **pSecExpr;
SExprInfo *pExprs;
SExprInfo *pSecExprs;
......@@ -423,10 +429,11 @@ typedef struct SSWindowOperatorInfo {
void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
SColumnInfo* pTagCols);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlFuncMsg **pExprMsg, SExprInfo *prevExpr);
SSqlExpr **pExpr, SExprInfo *prevExpr);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
......
......@@ -27,7 +27,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery);
......
此差异已折叠。
......@@ -34,7 +34,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
int32_t size = 0;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
size += pQuery->pExpr1[i].interBytes;
size += pQuery->pExpr1[i].base.interBytes;
}
assert(size >= 0);
......@@ -139,7 +139,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) {
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
int16_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
int16_t size = pRuntimeEnv->pQuery->pExpr1[i].base.resType;
char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset);
memset(s, 0, size);
......
......@@ -14,7 +14,6 @@
*/
#include "os.h"
#include "qFill.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tglobal.h"
......@@ -23,13 +22,11 @@
#include "hash.h"
#include "texpr.h"
#include "qExecutor.h"
#include "qResultbuf.h"
#include "qUtil.h"
#include "query.h"
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
#include "tcompare.h"
typedef struct SQueryMgmt {
pthread_mutex_t lock;
......@@ -58,8 +55,8 @@ void freeParam(SQueryParam *param) {
tfree(param->tagCond);
tfree(param->tbnameCond);
tfree(param->pTableIdList);
tfree(param->pExprMsg);
tfree(param->pSecExprMsg);
// tfree(param->pExprMsg);
// tfree(param->pSecExprMsg);
tfree(param->pExprs);
tfree(param->pSecExprs);
tfree(param->pGroupColIndex);
......@@ -91,12 +88,14 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
goto _over;
}
if ((code = createQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->numOfOutput, &param.pExprs, param.pExprMsg, param.pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->colList};
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, &param.pExprs, param.pExpr, param.pTagColumnInfo,
pQueryMsg->queryType, pQueryMsg)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (param.pSecExprMsg != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExprMsg, param.pExprs)) != TSDB_CODE_SUCCESS) {
if (param.pSecExpr != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExpr, param.pExprs)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
......
......@@ -252,7 +252,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
pRsp->qhandle = 0;
pRsp->qid = 0;
pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp;
......@@ -270,7 +270,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
return pRsp->code;
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64(qId);
pRsp->qid = htobe64(qId);
}
if (handle != NULL &&
......@@ -343,37 +343,37 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
}
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
void * pCont = pRead->pCont;
SRspRet *pRet = &pRead->rspRet;
void *pCont = pRead->pCont;
SRspRet *pRet = &pRead->rspRet;
SRetrieveTableMsg *pRetrieve = pCont;
pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->qid = htobe64(pRetrieve->qid);
vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle,
vTrace("vgId:%d, qid:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qid,
pRetrieve->free, pRead->rpcHandle);
memset(pRet, 0, sizeof(SRspRet));
terrno = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
void **handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qid);
if (handle == NULL) {
code = terrno;
terrno = TSDB_CODE_SUCCESS;
} else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) {
} else if (!checkQIdEqual(*handle, pRetrieve->qid)) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
}
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle);
vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qid);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle);
vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qid, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -384,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qid, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -402,7 +402,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true;
} else { // result is not ready, return immediately
// Only effects in the non-blocking model
// Only affects the non-blocking model
if (!tsRetrieveBlockingModel) {
if (!buildRes) {
assert(pRead->rpcHandle != NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册