未验证 提交 05374183 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2235 from taosdata/hotfix/killquery

Hotfix/killquery
...@@ -4380,9 +4380,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4380,9 +4380,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
if (index.columnIndex < tscGetNumOfColumns(pTableMeta)) { int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex < numOfCols) {
return invalidSqlErrMsg(pQueryInfo->msg, msg10); return invalidSqlErrMsg(pQueryInfo->msg, msg10);
} else if (index.columnIndex == 0) { } else if (index.columnIndex == numOfCols) {
return invalidSqlErrMsg(pQueryInfo->msg, msg11); return invalidSqlErrMsg(pQueryInfo->msg, msg11);
} }
......
...@@ -89,6 +89,8 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { ...@@ -89,6 +89,8 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
.contLen = 0 .contLen = 0
}; };
if (pMsg->pCont == NULL) return;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_RPC_NOT_READY; rspMsg.code = TSDB_CODE_RPC_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
......
...@@ -115,6 +115,8 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) { ...@@ -115,6 +115,8 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcIpSet *pIpSet) {
.contLen = 0 .contLen = 0
}; };
if (pMsg->pCont == NULL) return;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_RPC_NOT_READY; rpcMsg.code = TSDB_CODE_RPC_NOT_READY;
......
...@@ -26,13 +26,6 @@ ...@@ -26,13 +26,6 @@
#include "dnodeVRead.h" #include "dnodeVRead.h"
#include "vnode.h" #include "vnode.h"
typedef struct {
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
...@@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) {
} }
dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
} }
......
...@@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co ...@@ -77,6 +77,12 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* co
*/ */
bool qHasMoreResultsToRetrieve(qinfo_t qinfo); bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
/**
* kill current ongoing query and free query handle automatically
* @param qinfo
*/
int32_t qKillQuery(qinfo_t qinfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -83,6 +83,7 @@ void rpcSendResponse(const SRpcMsg *pMsg); ...@@ -83,6 +83,7 @@ void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcReportProgress(void *pConn, char *pCont, int contLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -34,6 +34,13 @@ typedef struct { ...@@ -34,6 +34,13 @@ typedef struct {
void *qhandle; //used by query and retrieve msg void *qhandle; //used by query and retrieve msg
} SRspRet; } SRspRet;
typedef struct {
SRspRet rspRet;
void *pCont;
int32_t contLen;
SRpcMsg rpcMsg;
} SReadMsg;
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
...@@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode); ...@@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
void vnodeBuildStatusMsg(void * param); void vnodeBuildStatusMsg(void * param);
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -12,8 +12,8 @@ ...@@ -12,8 +12,8 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "qfill.h"
#include "os.h" #include "os.h"
#include "qfill.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h" #include "hashfunc.h"
...@@ -5822,20 +5822,36 @@ _over: ...@@ -5822,20 +5822,36 @@ _over:
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
*pQInfo = NULL; *pQInfo = NULL;
} else {
SQInfo* pq = (SQInfo*) (*pQInfo);
T_REF_INC(pq);
T_REF_INC(pq);
} }
// if failed to add ref for all meters in this query, abort current query // if failed to add ref for all meters in this query, abort current query
return code; return code;
} }
void qDestroyQueryInfo(qinfo_t pQInfo) { static void doDestoryQueryInfo(SQInfo* pQInfo) {
assert(pQInfo != NULL);
qTrace("QInfo:%p query completed", pQInfo); qTrace("QInfo:%p query completed", pQInfo);
queryCostStatis(pQInfo); // print the query cost summary
// print the query cost summary
queryCostStatis(pQInfo);
freeQInfo(pQInfo); freeQInfo(pQInfo);
} }
void qDestroyQueryInfo(qinfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
int16_t ref = T_REF_DEC(pQInfo);
if (ref == 0) {
doDestoryQueryInfo(pQInfo);
}
}
void qTableQuery(qinfo_t qinfo) { void qTableQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
...@@ -5846,6 +5862,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5846,6 +5862,7 @@ void qTableQuery(qinfo_t qinfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p it is already killed, abort", pQInfo); qTrace("QInfo:%p it is already killed, abort", pQInfo);
qDestroyQueryInfo(pQInfo);
return; return;
} }
...@@ -5861,7 +5878,7 @@ void qTableQuery(qinfo_t qinfo) { ...@@ -5861,7 +5878,7 @@ void qTableQuery(qinfo_t qinfo) {
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo); qDestroyQueryInfo(pQInfo);
} }
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
...@@ -5887,20 +5904,29 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) { ...@@ -5887,20 +5904,29 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) { bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) {
qTrace("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code);
return false; return false;
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
bool ret = false;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
return false; ret = false;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
return true; ret = true;
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
return true; ret = true;
} else { } else {
assert(0); assert(0);
} }
if (ret) {
T_REF_INC(pQInfo);
qTrace("QInfo:%p has more results waits for client retrieve", pQInfo);
}
return ret;
} }
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) { int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen) {
...@@ -5945,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -5945,6 +5971,19 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return code; return code;
} }
int32_t qKillQuery(qinfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo);
return TSDB_CODE_SUCCESS;
}
static void buildTagQueryResult(SQInfo* pQInfo) { static void buildTagQueryResult(SQInfo* pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
......
...@@ -484,6 +484,15 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg ...@@ -484,6 +484,15 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg
return; return;
} }
// this API is used by server app to keep an APP context in case connection is broken
void rpcReportProgress(void *handle, char *pCont, int contLen) {
SRpcConn *pConn = (SRpcConn *)handle;
// pReqMsg and reqMsgLen is re-used to store the context from app server
pConn->pReqMsg = pCont;
pConn->reqMsgLen = contLen;
}
static void rpcFreeMsg(void *msg) { static void rpcFreeMsg(void *msg) {
if ( msg ) { if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
...@@ -848,6 +857,21 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -848,6 +857,21 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
return pConn; return pConn;
} }
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc;
// if there are pending request, notify the app
tTrace("%s, notify the server app, connection is gone", pConn->info);
SRpcMsg rpcMsg;
rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL);
}
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn == NULL) return; if (pConn == NULL) return;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
...@@ -861,19 +885,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -861,19 +885,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} }
if (pConn->inType) { if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
// if there are pending request, notify the app
tTrace("%s, connection is gone, notify the app", pConn->info);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
(*(pRpc->cfp))(&rpcMsg);
*/
}
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcCloseConn(pConn); rpcCloseConn(pConn);
...@@ -1212,23 +1224,10 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1212,23 +1224,10 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
static void rpcProcessIdleTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0]) { if (pConn->user[0]) {
tTrace("%s, close the connection since no activity", pConn->info); tTrace("%s, close the connection since no activity", pConn->info);
if (pConn->inType && pRpc->cfp) { if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
// if there are pending request, notify the app
tTrace("%s, notify the app, connection is gone", pConn->info);
/*
SRpcMsg rpcMsg;
rpcMsg.pCont = NULL;
rpcMsg.contLen = 0;
rpcMsg.handle = pConn;
rpcMsg.msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
(*(pRpc->cfp))(&rpcMsg);
*/
}
rpcCloseConn(pConn); rpcCloseConn(pConn);
} else { } else {
tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
......
...@@ -27,17 +27,18 @@ ...@@ -27,17 +27,18 @@
#include "vnodeLog.h" #include "vnodeLog.h"
#include "query.h" #include "query.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
} }
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType;
if (vnodeProcessReadMsgFp[msgType] == NULL) { if (vnodeProcessReadMsgFp[msgType] == NULL) {
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]); vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
...@@ -55,16 +56,44 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, ...@@ -55,16 +56,44 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen,
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
} }
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { // notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
killQueryMsg->free = htons(1);
killQueryMsg->header.vgId = htonl(vgId);
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet;
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS; // qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
qDestroyQueryInfo((qinfo_t) killQueryMsg->qhandle);
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
if (contLen != 0) { if (contLen != 0) {
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
...@@ -74,7 +103,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -74,7 +103,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId);
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
...@@ -91,13 +122,34 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -91,13 +122,34 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
return code; return code;
} }
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont; SRetrieveTableMsg *pRetrieve = pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
if (pRetrieve->free == 1) {
vTrace("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
int32_t ret = qKillQuery(pQInfo);
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp;
pRsp->numOfRows = 0;
pRsp->completed = true;
pRsp->useconds = 0;
return ret;
}
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
int32_t code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
...@@ -110,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -110,8 +162,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = pQInfo;
code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
} else { } else { // no further execution invoked, release the ref to vnode
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo); qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -115,15 +115,15 @@ int main(int argc, char *argv[]) { ...@@ -115,15 +115,15 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n"); printf("success to connect to server\n");
// doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1"); // doQuery(taos, "select c1,count(*) from group_db0.group_mt0 where c1<8 group by c1");
doQuery(taos, "select * from test.m1"); // doQuery(taos, "select * from test.m1");
// multiThreadTest(1, taos); // multiThreadTest(1, taos);
// doQuery(taos, "select tbname from test.m1"); // doQuery(taos, "select tbname from test.m1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0') interval(1s) group by t1");
// doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)"); // doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 and tbname in ('lm2_tb0', 'lm2_tb1', 'lm2_tb2') interval(1s)");
// for(int32_t i = 0; i < 100000; ++i) { for(int32_t i = 0; i < 200; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)"); doQuery(taos, "select * from lm2_db0.lm2_stb0");
// } }
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))"); // doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
taos_close(taos); taos_close(taos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册