提交 bdee045c 编写于 作者: H Haojun Liao

[td-225] add cancel query support, fix bugs in load file blocks when column...

[td-225] add cancel query support, fix bugs in load file blocks when column has already been updated.
上级 ccd9cde5
...@@ -294,11 +294,12 @@ typedef struct STscObj { ...@@ -294,11 +294,12 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSqlObj { typedef struct SSqlObj {
void * signature; void *signature;
STscObj *pTscObj; STscObj *pTscObj;
void (*fp)(); void *SRpcReqContext;
void (*fetchFp)(); void (*fp)();
void * param; void (*fetchFp)();
void *param;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * pStream; void * pStream;
......
...@@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -196,8 +196,8 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.handle = pSql, .handle = pSql,
.code = 0 .code = 0
}; };
rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
pSql->SRpcReqContext = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -422,7 +422,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
* sub-queries not correctly released and master sql object of super table query reaches an abnormal state. * sub-queries not correctly released and master sql object of super table query reaches an abnormal state.
*/ */
pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; pSql->pSubs[i]->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
// taosStopRpcConn(pSql->pSubs[i]->); rpcCancelRequest(pSql->pSubs[i]->SRpcReqContext);
} }
/* /*
......
...@@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -627,7 +627,7 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
//taosStopRpcConn(pSql->thandle); rpcCancelRequest(pSql->SRpcReqContext);
tscTrace("%p query is cancelled", res); tscTrace("%p query is cancelled", res);
} }
......
...@@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet); ...@@ -84,7 +84,7 @@ void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCanelRequest(void *pContext); void rpcCancelRequest(void *pContext);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -792,38 +792,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap ...@@ -792,38 +792,77 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns); int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
//data in buffer has greater timestamp, copy data in file block //data in buffer has greater timestamp, copy data in file block
for (int32_t i = 0; i < requiredNumOfCols; ++i) { int32_t i = 0, j = 0;
while(i < requiredNumOfCols && j < pCols->numOfCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
int32_t bytes = pColInfo->info.bytes;
SDataCol* src = &pCols->cols[j];
if (src->colId < pColInfo->info.colId) {
j++;
continue;
}
int32_t bytes = pColInfo->info.bytes;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) { if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes; pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else { } else {
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
} }
for (int32_t j = 0; j < pCols->numOfCols; ++j) { // todo opt performance if (pColInfo->info.colId == src->colId) {
SDataCol* src = &pCols->cols[j];
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) {
if (pColInfo->info.colId == src->colId) { memmove(pData, src->pData + bytes * start, bytes * num);
} else { // handle the var-string
if (pColInfo->info.type != TSDB_DATA_TYPE_BINARY && pColInfo->info.type != TSDB_DATA_TYPE_NCHAR) { char* dst = pData;
memmove(pData, src->pData + bytes * start, bytes * num);
} else { // handle the var-string // todo refactor, only copy one-by-one
char* dst = pData; for (int32_t k = start; k < num + start; ++k) {
char* p = tdGetColDataOfRow(src, k);
// todo refactor, only copy one-by-one memcpy(dst, p, varDataTLen(p));
for (int32_t k = start; k < num + start; ++k) { dst += bytes;
char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p));
dst += bytes;
}
} }
break;
} }
j++;
i++;
} else { // pColInfo->info.colId < src->colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
char* dst = pData;
for(int32_t k = start; k < num + start; ++k) {
setVardataNull(dst, pColInfo->info.type);
dst += bytes;
}
} else {
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
}
i++;
} }
} }
while (i < requiredNumOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
char* dst = pData;
for(int32_t k = start; k < num + start; ++k) {
setVardataNull(dst, pColInfo->info.type);
dst += pColInfo->info.bytes;
}
} else {
setNullN(pData, pColInfo->info.type, pColInfo->info.bytes, num);
}
i++;
}
pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.win.ekey = tsArray[end];
pQueryHandle->cur.lastKey = tsArray[end] + step; pQueryHandle->cur.lastKey = tsArray[end] + step;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册