提交 6e70d237 编写于 作者: S slguan

Merge branch '2.0' into refact/slguan

...@@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1238,22 +1238,11 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _clean; goto _clean;
} }
// submit to more than one vnode
if (pCmd->pDataBlocks->nSize > 0) { if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgId // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean; goto _error_clean;
} }
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
// set the next sent data vnode index in data block arraylist
pTableMetaInfo->vnodeIndex = 1;
} else { } else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
} }
......
...@@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, ...@@ -1489,11 +1489,9 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes); SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, resColIdx, functionID, pColIndex, type, bytes, bytes);
strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName)); strncpy(pExpr->aliasName, columnName, tListLen(pExpr->aliasName));
// for point interpolation/last_row query, we need the timestamp column to be loaded // for all querie, the timestamp column meeds to be loaded
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
if (functionID == TSDB_FUNC_INTERP || functionID == TSDB_FUNC_LAST_ROW) { tscColumnBaseInfoInsert(pQueryInfo, &index);
tscColumnBaseInfoInsert(pQueryInfo, &index);
}
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr); insertResultField(pQueryInfo, resColIdx, &ids, bytes, type, columnName, pExpr);
...@@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1581,7 +1579,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
} }
} }
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
case TK_SUM: case TK_SUM:
...@@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1689,7 +1690,10 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i])); tscColumnBaseInfoInsert(pQueryInfo, &(ids.ids[i]));
} }
} }
SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnBaseInfoInsert(pQueryInfo, &tsCol);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
case TK_FIRST: case TK_FIRST:
...@@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1708,7 +1712,6 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
} }
/* in first/last function, multiple columns can be add to resultset */ /* in first/last function, multiple columns can be add to resultset */
for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) { for (int32_t i = 0; i < pItem->pNode->pParam->nExpr; ++i) {
tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]); tSQLExprItem* pParamElem = &(pItem->pNode->pParam->a[i]);
if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) { if (pParamElem->pNode->nSQLOptr != TK_ALL && pParamElem->pNode->nSQLOptr != TK_ID) {
...@@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1753,7 +1756,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
} }
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // select * from xxx } else { // select * from xxx
int32_t numOfFields = 0; int32_t numOfFields = 0;
...@@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1773,6 +1776,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta); numOfFields += tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1891,6 +1895,8 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
default: default:
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
} }
// todo refactor // todo refactor
......
...@@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -341,14 +341,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
(*pSql->fp)(pSql->param, taosres, rpcMsg->code); (*pSql->fp)(pSql->param, taosres, rpcMsg->code);
if (shouldFree) { if (shouldFree) {
// If it is failed, all objects allocated during execution taos_connect_a should be released tscFreeSqlObj(pSql);
if (command == TSDB_SQL_CONNECT) { tscTrace("%p Async sql is automatically freed", pSql);
taos_close(pObj);
tscTrace("%p Async sql close failed connection", pSql);
} else {
tscFreeSqlObj(pSql);
tscTrace("%p Async sql is automatically freed", pSql);
}
} }
} }
......
...@@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -594,11 +594,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
} }
if (numOfTableHasRes >= 2) { // do merge result if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL); success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
} else { // only one subquery } else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0]; SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -674,14 +670,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || if (pRes->qhandle == 0 ||
pRes->completed ||
pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pCmd->command == TSDB_SQL_INSERT) { pCmd->command == TSDB_SQL_INSERT) {
return NULL; return NULL;
} }
// current data are exhausted, fetch more data // current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
......
...@@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p ...@@ -504,7 +504,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
} }
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
SSqlInfo SQLInfo = {0}; SSqlInfo SQLInfo = {0};
tSQLParse(&SQLInfo, pSql->sqlstr); tSQLParse(&SQLInfo, pSql->sqlstr);
......
...@@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) { ...@@ -423,9 +423,6 @@ void tscFreeResData(SSqlObj* pSql) {
} }
void tscFreeSqlResult(SSqlObj* pSql) { void tscFreeSqlResult(SSqlObj* pSql) {
//TODO not free
return;
tfree(pSql->res.pRsp); tfree(pSql->res.pRsp);
pSql->res.row = 0; pSql->res.row = 0;
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
...@@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { ...@@ -469,8 +466,6 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
tscFreeSqlCmdData(pCmd); tscFreeSqlCmdData(pCmd);
tscTrace("%p free sqlObj partial completed", pSql); tscTrace("%p free sqlObj partial completed", pSql);
tscFreeSqlCmdData(pCmd);
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
...@@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -489,10 +484,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pCmd->allocSize = 0; pCmd->allocSize = 0;
if (pSql->fp == NULL) { tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->rspSem);
tsem_destroy(&pSql->emptyRspSem);
}
free(pSql); free(pSql);
} }
...@@ -721,7 +713,7 @@ static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { ...@@ -721,7 +713,7 @@ static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
SSubmitBlk* pBlock = pTableDataBlock->pData; SSubmitBlk* pBlock = (SSubmitBlk*) pTableDataBlock->pData;
int32_t rows = htons(pBlock->numOfRows); int32_t rows = htons(pBlock->numOfRows);
for(int32_t i = 0; i < rows; ++i) { for(int32_t i = 0; i < rows; ++i) {
...@@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { ...@@ -1751,16 +1743,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
} }
int32_t command = pSql->cmd.command; int32_t command = pSql->cmd.command;
if (pTscObj->pSql == pSql) { if (command == TSDB_SQL_CONNECT) {
/* return true;
* in case of taos_connect_a query, the object should all be released, even it is the
* master sql object. Otherwise, the master sql should not be released
*/
if (command == TSDB_SQL_CONNECT && pSql->res.code != TSDB_CODE_SUCCESS) {
return true;
}
return false;
} }
if (command == TSDB_SQL_INSERT) { if (command == TSDB_SQL_INSERT) {
......
...@@ -176,10 +176,10 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -176,10 +176,10 @@ static void *dnodeProcessReadQueue(void *param) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
dnodeProcessReadResult(pVnode, pReadMsg); // dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
dnodeReleaseVnode(pVnode); dnodeReleaseVnode(pVnode);
} }
return NULL; return NULL;
...@@ -220,7 +220,7 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { ...@@ -220,7 +220,7 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
code = terrno; code = terrno;
} }
//TODO: query handle is returned by dnodeProcessQueryMsg //TODO: query handle is returned by dnodeProcessQueryMsg
if (0) { if (0) {
SRpcMsg rsp; SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle; rsp.handle = pRead->rpcMsg.handle;
...@@ -232,47 +232,67 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { ...@@ -232,47 +232,67 @@ static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
} }
static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// SReadMsg readMsg = {
// .rpcMsg = {0},
// .pCont = qhandle,
// .contLen = 0,
// .pRpcContext = pMsg->pRpcContext,
// };
//
// taos_queue queue = dnodeGetVnodeRworker(pVnode);
// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL; SQInfo* pQInfo = NULL;
void* tsdb = dnodeGetVnodeTsdb(pVnode); if (pMsg->contLen != 0) {
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); void* tsdb = dnodeGetVnodeTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->qhandle = htobe64((uint64_t) (pQInfo));
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
rpcSendResponse(&rpcRsp); SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.pCont = pRsp,
.contLen = sizeof(SQueryTableRsp),
.code = code,
.msgType = 0
};
// do execute query rpcSendResponse(&rpcRsp);
qTableQuery(pQInfo); } else {
pQInfo = pMsg->pCont;
}
qTableQuery(pQInfo); // do execute query
} }
static int32_t c = 0;
static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
if ((++c)%2 == 0) {
int32_t k = 1;
}
int32_t rowSize = 0;
int32_t numOfRows = 0;
int32_t contLen = 0; int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL; SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
contLen = sizeof(SRetrieveTableRsp); contLen = sizeof(SRetrieveTableRsp);
...@@ -281,6 +301,12 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -281,6 +301,12 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg);
}
} }
SRpcMsg rpcRsp = (SRpcMsg) { SRpcMsg rpcRsp = (SRpcMsg) {
...@@ -292,7 +318,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -292,7 +318,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
//todo merge result should be done here
//dnodeProcessReadResult(&readMsg);
} }
...@@ -68,8 +68,10 @@ typedef struct SWindowResult { ...@@ -68,8 +68,10 @@ typedef struct SWindowResult {
} SWindowResult; } SWindowResult;
typedef struct SResultRec { typedef struct SResultRec {
int64_t pointsTotal; int64_t total;
int64_t pointsRead; int64_t size;
int64_t capacity;
int32_t threshold; // the threshold size, when the number of rows in result buffer, return to client
} SResultRec; } SResultRec;
typedef struct SWindowResInfo { typedef struct SWindowResInfo {
...@@ -112,7 +114,7 @@ typedef struct STableQueryInfo { ...@@ -112,7 +114,7 @@ typedef struct STableQueryInfo {
typedef struct STableDataInfo { typedef struct STableDataInfo {
int32_t numOfBlocks; int32_t numOfBlocks;
int32_t start; // start block index int32_t start; // start block index
int32_t tableIndex; int32_t tableIndex;
void* pMeterObj; void* pMeterObj;
int32_t groupIdx; // group id in table list int32_t groupIdx; // group id in table list
...@@ -143,7 +145,6 @@ typedef struct SQuery { ...@@ -143,7 +145,6 @@ typedef struct SQuery {
int32_t pos; int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data int64_t pointsOffset; // the number of points offset to save read data
SData** sdata; SData** sdata;
int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv { ...@@ -171,15 +172,13 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
void* pVnode; // void* param; // pointer to the RpcReadMsg
TSKEY startTime; TSKEY startTime;
TSKEY elapsedTime; TSKEY elapsedTime;
SResultRec rec;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
// int32_t killed; // denotes if current query is killed
sem_t dataReady; sem_t dataReady;
SArray* pTableIdList; // table list SArray* pTableIdList; // table id list
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t subgroupIdx; int32_t subgroupIdx;
int32_t offset; /* offset in group result set of subgroup */ int32_t offset; /* offset in group result set of subgroup */
...@@ -204,7 +203,7 @@ typedef struct SQInfo { ...@@ -204,7 +203,7 @@ typedef struct SQInfo {
* @param pQInfo * @param pQInfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo);
/** /**
* query on single table * query on single table
...@@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg); ...@@ -222,7 +221,7 @@ void qSuperTableQuery(void* pReadMsg);
* wait for the query completed, and retrieve final results to client * wait for the query completed, and retrieve final results to client
* @param pQInfo * @param pQInfo
*/ */
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize); int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo);
/** /**
* *
...@@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro ...@@ -232,4 +231,11 @@ int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* ro
*/ */
int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen); int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
/**
*
* @param pQInfo
* @return
*/
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
此差异已折叠。
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "rpcServer.h" #include "rpcServer.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "trpc.h" #include "trpc.h"
#include "hash.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
...@@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) {
} }
if (pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); // pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
if (pRpc->hash == NULL) { if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label); tError("%s failed to init string hash", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -292,7 +294,8 @@ void rpcClose(void *param) { ...@@ -292,7 +294,8 @@ void rpcClose(void *param) {
} }
} }
taosCleanUpStrHash(pRpc->hash); // taosCleanUpStrHash(pRpc->hash);
taosHashCleanup(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl); taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool); taosIdPoolCleanUp(pRpc->idPool);
rpcCloseConnCache(pRpc->pCache); rpcCloseConnCache(pRpc->pCache);
...@@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) { ...@@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); // taosDeleteStrHash(pRpc->hash, hashstr);
// taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
pConn->inType = 0; pConn->inType = 0;
...@@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); // SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn; if (ppConn) pConn = *ppConn;
if (pConn) return pConn; if (pConn) return pConn;
...@@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->localPort = (pRpc->localPort + pRpc->index); pConn->localPort = (pRpc->localPort + pRpc->index);
} }
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); // taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort); pRpc->label, pConn, sid, pConn->user, pConn->localPort);
} }
......
...@@ -5312,7 +5312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5312,7 +5312,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (1) { while (1) {
// check if query is killed or not set the status of query to pass the status check // check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return cnt; return cnt;
} }
...@@ -6375,7 +6375,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -6375,7 +6375,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->scanFlag = REPEAT_SCAN; pRuntimeEnv->scanFlag = REPEAT_SCAN;
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
......
...@@ -105,7 +105,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo ...@@ -105,7 +105,7 @@ static void queryOnMultiDataCache(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
int32_t start = pSupporter->pSidSet->starterPos[groupIdx]; int32_t start = pSupporter->pSidSet->starterPos[groupIdx];
int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1; int32_t end = pSupporter->pSidSet->starterPos[groupIdx + 1] - 1;
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
return; return;
} }
...@@ -276,7 +276,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo ...@@ -276,7 +276,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (1) { while (1) {
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
break; break;
} }
...@@ -363,7 +363,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo ...@@ -363,7 +363,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1; int32_t j = QUERY_IS_ASC_QUERY(pQuery) ? 0 : numOfBlocks - 1;
for (; j < numOfBlocks && j >= 0; j += step) { for (; j < numOfBlocks && j >= 0; j += step) {
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
break; break;
} }
...@@ -603,7 +603,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -603,7 +603,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pSupporter->meterIdx = start; pSupporter->meterIdx = start;
for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) { for (int32_t k = start; k <= end; ++k, pSupporter->meterIdx++) {
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
...@@ -630,7 +630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -630,7 +630,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
pSupporter->subgroupIdx); pSupporter->subgroupIdx);
for (int32_t k = start; k <= end; ++k) { for (int32_t k = start; k <= end; ++k) {
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
...@@ -681,7 +681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { ...@@ -681,7 +681,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) {
while (pSupporter->meterIdx < pSupporter->numOfMeters) { while (pSupporter->meterIdx < pSupporter->numOfMeters) {
int32_t k = pSupporter->meterIdx; int32_t k = pSupporter->meterIdx;
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
...@@ -958,7 +958,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -958,7 +958,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
doMultiMeterSupplementaryScan(pQInfo); doMultiMeterSupplementaryScan(pQInfo);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query killed, abort", pQInfo); dTrace("QInfo:%p query killed, abort", pQInfo);
return; return;
} }
...@@ -998,7 +998,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { ...@@ -998,7 +998,7 @@ static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) {
vnodeScanAllData(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv); doFinalizeResult(pRuntimeEnv);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
return; return;
} }
...@@ -1033,7 +1033,7 @@ static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) { ...@@ -1033,7 +1033,7 @@ static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) {
vnodeScanAllData(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv);
doFinalizeResult(pRuntimeEnv); doFinalizeResult(pRuntimeEnv);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
return; return;
} }
...@@ -1087,7 +1087,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter ...@@ -1087,7 +1087,7 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter
initCtxOutputBuf(pRuntimeEnv); initCtxOutputBuf(pRuntimeEnv);
vnodeScanAllData(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv);
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
return; return;
} }
...@@ -1301,7 +1301,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { ...@@ -1301,7 +1301,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) {
pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->useconds += (taosGetTimestampUs() - st);
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
pQInfo->over = 1; pQInfo->over = 1;
} else { } else {
...@@ -1345,7 +1345,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { ...@@ -1345,7 +1345,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
/* record the total elapsed time */ /* record the total elapsed time */
pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->useconds += (taosGetTimestampUs() - st);
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; pQInfo->over = isQueryKilled(pQInfo) ? 1 : 0;
taosInterpoSetStartInfo(&pQInfo->pTableQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, taosInterpoSetStartInfo(&pQInfo->pTableQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
pQInfo->query.interpoType); pQInfo->query.interpoType);
......
...@@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle { ...@@ -124,6 +124,7 @@ typedef struct STsdbQueryHandle {
int32_t tableIndex; int32_t tableIndex;
bool isFirstSlot; bool isFirstSlot;
void * qinfo; // query info handle, for debug purpose void * qinfo; // query info handle, for debug purpose
SSkipListIterator* memIter;
} STsdbQueryHandle; } STsdbQueryHandle;
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
...@@ -335,7 +336,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max ...@@ -335,7 +336,6 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
SDataRow row = SL_GET_NODE_DATA(node); SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) > maxKey) break; if (dataRowKey(row) > maxKey) break;
// Convert row data to column data
if (*skey == INT64_MIN) { if (*skey == INT64_MIN) {
*skey = dataRowKey(row); *skey = dataRowKey(row);
...@@ -345,13 +345,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max ...@@ -345,13 +345,13 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
int32_t offset = 0; int32_t offset = 0;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0); SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, i);
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
offset += pColInfo->info.bytes; offset += pColInfo->info.bytes;
} }
numOfRows++; numOfRows++;
if (numOfRows > maxRowsToRead) break; if (numOfRows >= maxRowsToRead) break;
}; };
return numOfRows; return numOfRows;
...@@ -368,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { ...@@ -368,8 +368,13 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
int32_t rows = 0; int32_t rows = 0;
if (pTable->mem != NULL) { if (pTable->mem != NULL) {
SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData);
rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); // create mem table iterator if it is not created yet
if (pHandle->memIter == NULL) {
pHandle->memIter = tSkipListCreateIter(pTable->mem->pData);
}
rows = tsdbReadRowsFromCache(pHandle->memIter, INT64_MAX, 2, &skey, &ekey, pHandle);
} }
SDataBlockInfo blockInfo = { SDataBlockInfo blockInfo = {
...@@ -392,7 +397,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData ...@@ -392,7 +397,9 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SData
} }
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
// in case of data in cache, all data has been kept in column info object.
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
return pHandle->pColumns;
} }
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {} int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
......
...@@ -64,6 +64,9 @@ int main(int argc, char *argv[]) { ...@@ -64,6 +64,9 @@ int main(int argc, char *argv[]) {
memset(buf, 0, 512); memset(buf, 0, 512);
} }
taos_close(taos);
getchar();
return 0; return 0;
taos_query(taos, "drop database demo"); taos_query(taos, "drop database demo");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册