diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index e538281bc06c8470e638f7b51ef7689e0a41eef7..bd956aeb7426f145de33ccc82ea65c56157ed919 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,11 +294,12 @@ typedef struct STscObj { } STscObj; typedef struct SSqlObj { - void * signature; - STscObj *pTscObj; - void (*fp)(); - void (*fetchFp)(); - void * param; + void *signature; + STscObj *pTscObj; + void *SRpcReqContext; + void (*fp)(); + void (*fetchFp)(); + void *param; int64_t stime; uint32_t queryId; void * pStream; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5a2054bbcdb500787554a7e4e0eecff14a2fc08d..b376100a7f88942d31e0ed9ec732b9d688b4b9d8 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); + pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); return TSDB_CODE_SUCCESS; } @@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { * sub-queries not correctly released and master sql object of super table query reaches an abnormal state. */ pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; -// taosStopRpcConn(pSql->pSubs[i]->); + rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext); } /* diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 126e34704c4596ad9a8fbc80efbad38c9bdaa06c..b5999c2a4dc6759fe0f7f4422af7aaf9c8db26f6 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) { return; } - //taosStopRpcConn(pSql->thandle); + rpcCancelRequest(pSql->SRpcReqContext); tscTrace("%p query is cancelled", res); } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 6c5d7fa889b855b91fb47e99e1bf5a80bc88f99e..30e0f9eee1889e8b832812fcc3f17f81c157724e 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCanelRequest(void *pContext); +void rpcCancelRequest(void *pContext); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 732e8e900812acff6bc18ea7cd4277b74d1b917f..748ab49542fe070672049f89f3248086ad0b72c8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -792,38 +792,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); //data in buffer has greater timestamp, copy data in file block - for (int32_t i = 0; i < requiredNumOfCols; ++i) { + int32_t i = 0, j = 0; + while(i < requiredNumOfCols && j < pCols->numOfCols) { SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); - int32_t bytes = pColInfo->info.bytes; - + + SDataCol* src = &pCols->cols[j]; + if (src->colId < pColInfo->info.colId) { + j++; + continue; + } + + int32_t bytes = pColInfo->info.bytes; + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; } else { pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; } - - for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance - SDataCol* src = &pCols->cols[j]; - - if (pColInfo->info.colId == src->colId) { - - if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { - memmove(pData, src->pData + bytes * start, bytes * num); - } else { // handle the var-string - char* dst = pData; - - // todo refactor, only copy one-by-one - for (int32_t k = start; k < num + start; ++k) { - char* p = tdGetColDataOfRow(src, k); - memcpy(dst, p, varDataTLen(p)); - dst += bytes; - } + + if (pColInfo->info.colId == src->colId) { + + if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { + memmove(pData, src->pData + bytes * start, bytes * num); + } else { // handle the var-string + char* dst = pData; + + // todo refactor, only copy one-by-one + for (int32_t k = start; k < num + start; ++k) { + char* p = tdGetColDataOfRow(src, k); + memcpy(dst, p, varDataTLen(p)); + dst += bytes; } - - break; } + + j++; + i++; + } else { // pColInfo->info.colId < src->colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + i++; } } + + while (i < requiredNumOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; + } + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + char* dst = pData; + + for(int32_t k = start; k < num + start; ++k) { + setVardataNull(dst, pColInfo->info.type); + dst += pColInfo->info.bytes; + } + } else { + setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num); + } + + i++; + } pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step;