From bdee045c561021da95cd01106636ac3b4456bcc6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Jun 2020 18:40:35 +0800 Subject: [PATCH] [td-225] add cancel query support, fix bugs in load file blocks when column has already been updated. --- src/client/inc/tsclient.h | 11 ++--- src/client/src/tscServer.c | 4 +- src/client/src/tscSql.c | 2 +- src/inc/trpc.h | 2 +- src/tsdb/src/tsdbRead.c | 83 ++++++++++++++++++++++++++++---------- 5 files changed, 71 insertions(+), 31 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e538281bc0..bd956aeb74 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,11 +294,12 @@ typedef struct STscObj { } STscObj; typedef struct SSqlObj { - void * signature; - STscObj *pTscObj; - void (*fp)(); - void (*fetchFp)(); - void * param; + void *signature; + STscObj *pTscObj; + void *SRpcReqContext; + void (*fp)(); + void (*fetchFp)(); + void *param; int64_t stime; uint32_t queryId; void * pStream; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5a2054bbcd..b376100a7f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { * sub-queries not correctly released and master sql object of super table query reaches an abnormal state. */ pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; -// taosStopRpcConn(pSql->pSubs[i]->); + rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext); } /* diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 126e34704c..b5999c2a4d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - //taosStopRpcConn(pSql->thandle); + rpcCancelRequest(pSql->SRpcReqContext); tscTrace("%p query is cancelled", res); } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 6c5d7fa889..30e0f9eee1 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCanelRequest(void *pContext); +void rpcCancelRequest(void *pContext); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 732e8e9008..748ab49542 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -792,38 +792,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); //data in buffer has greater timestamp, copy data in file block - for (int32_t i = 0; i < requiredNumOfCols; ++i) { + int32_t i = 0, j = 0; + while(i < requiredNumOfCols && j < pCols->numOfCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - int32_t bytes = pColInfo->info.bytes; - + + SDataCol* src = &pCols->cols[j]; + if (src->colId < pColInfo->info.colId) { + j++; + continue; + } + + int32_t bytes = pColInfo->info.bytes; + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; } - - for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance - SDataCol* src = &pCols->cols[j]; - - if (pColInfo->info.colId == src->colId) { - - if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { - memmove(pData, src->pData + bytes * start, bytes * num); - } else { // handle the var-string - char* dst = pData; - - // todo refactor, only copy one-by-one - for (int32_t k = start; k < num + start; ++k) { - char* p = tdGetColDataOfRow(src, k); - memcpy(dst, p, varDataTLen(p)); - dst += bytes; - } + + if (pColInfo->info.colId == src->colId) { + + if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { + memmove(pData, src->pData + bytes * start, bytes * num); + } else { // handle the var-string + char* dst = pData; + + // todo refactor, only copy one-by-one + for (int32_t k = start; k < num + start; ++k) { + char* p = tdGetColDataOfRow(src, k); + memcpy(dst, p, varDataTLen(p)); + dst += bytes; } - - break; } + + j++; + i++; + } else { // pColInfo->info.colId < src->colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + i++; } } + + while (i < requiredNumOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; + } + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += pColInfo->info.bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + + i++; + } pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step; -- GitLab