提交 224fa95f 编写于 作者: H hjxilinx

[td-98] fix bugs in query processing

上级 fb92f93f
......@@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql = sToken.z;
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (pSql->asyncTblPos == NULL) {
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
}
}
int32_t len = cend - cstart + 1;
......@@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/*
* For async insert, after get the metermeta from server, the sql string will not be
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information.
* For async insert, after get the table meta from server, the sql string will not be
* parsed using the new table meta to avoid the overhead cause by get table meta data information.
* And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position.
*/
......
......@@ -5168,7 +5168,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo);
} else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size);
dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size,
pQuery->rec.total);
}
sem_post(&pQInfo->dataReady);
......@@ -5838,7 +5839,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
vnodeParametersSafetyCheck(pQuery);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
_clean_memory:
......@@ -6157,18 +6158,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
}
return pQInfo->code;
}
sem_wait(&pQInfo->dataReady);
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
pQInfo->code);
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
return pQInfo->code;
}
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
......@@ -6213,6 +6210,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data);
} else {
setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code;
}
......
......@@ -49,9 +49,8 @@ typedef struct SQueryFilePos {
} SQueryFilePos;
typedef struct SDataBlockLoadInfo {
int32_t fileListIndex;
int32_t fileId;
int32_t slotIdx;
SFileGroup* fileGroup;
int32_t slot;
int32_t sid;
SArray *pLoadedCols;
} SDataBlockLoadInfo;
......@@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
}
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->slot = -1;
pBlockLoadInfo->sid = -1;
pBlockLoadInfo->fileListIndex = -1;
pBlockLoadInfo->fileGroup = NULL;
}
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
......@@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1;
}
static int fileOrderComparFn(const void *p1, const void *p2) {
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
if (pInfo1->fileId == pInfo2->fileId) {
return 0;
}
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
}
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
char suffix[] = ".head";
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
struct dirent *pEntry = NULL;
pVnodeFilesInfo->vnodeId = vnodeId;
char* tsDirectory = "";
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
// strerror(errno));
return;
}
while ((pEntry = readdir(pDir)) != NULL) {
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
continue;
}
if (pEntry->d_type & DT_DIR) {
continue;
}
size_t len = strlen(pEntry->d_name);
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
continue;
}
int32_t vid = 0;
int32_t fid = 0;
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
if (vid != vnodeId) { /* ignore error files */
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
continue;
}
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
// if (fid > pVnode->fileId || fid < firstFid) {
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
// fid, firstFid, pVnode->fileId);
// continue;
// }
assert(fid >= 0 && vid >= 0);
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
}
closedir(pDir);
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
// pVnodeFilesInfo->dbFilePathPrefix);
// order the files information according their names */
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
}
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
// todo 1. filter not exist table
......@@ -807,6 +735,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
}
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true;
}
......@@ -815,6 +748,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// failed to load data from disk, abort current query
if (blockLoaded == false) {
taosArrayDestroy(sa);
tfree(data);
return false;
}
......@@ -1001,10 +937,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
return pHandle->pColumns;
} else {
SArray *sa = getDefaultLoadColumns(pHandle, true);
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else {
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
}
}
}
}
......@@ -1361,7 +1303,10 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
tfree(pTableCheckInfo->pDataCols->buf);
if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册