提交 f54daccd 编写于 作者: S slguan

Merge remote-tracking branch 'origin/develop' into feature/vpeer

...@@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { ...@@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql = sToken.z; sql = sToken.z;
} }
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (pSql->asyncTblPos == NULL) {
assert(code == TSDB_CODE_ACTION_IN_PROGRESS);
}
} }
int32_t len = cend - cstart + 1; int32_t len = cend - cstart + 1;
...@@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/* /*
* For async insert, after get the metermeta from server, the sql string will not be * For async insert, after get the table meta from server, the sql string will not be
* parsed using the new metermeta to avoid the overhead cause by get metermeta data information. * parsed using the new table meta to avoid the overhead cause by get table meta data information.
* And during the getMeterMetaCallback function, the sql string will be parsed from the * And during the getMeterMetaCallback function, the sql string will be parsed from the
* interrupted position. * interrupted position.
*/ */
......
...@@ -5168,7 +5168,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { ...@@ -5168,7 +5168,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
} else { } else {
dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size,
pQuery->rec.total);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
...@@ -5838,7 +5839,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5838,7 +5839,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
vnodeParametersSafetyCheck(pQuery); vnodeParametersSafetyCheck(pQuery);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo; return pQInfo;
_clean_memory: _clean_memory:
...@@ -6157,18 +6158,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { ...@@ -6157,18 +6158,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) { return pQInfo->code;
return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
}
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size,
pQInfo->code); pQInfo->code);
return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); return pQInfo->code;
} }
bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
...@@ -6213,6 +6210,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c ...@@ -6213,6 +6210,7 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) {
code = doDumpQueryResult(pQInfo, (*pRsp)->data); code = doDumpQueryResult(pQInfo, (*pRsp)->data);
} else { } else {
setQueryStatus(pQuery, QUERY_OVER);
code = pQInfo->code; code = pQInfo->code;
} }
......
...@@ -49,9 +49,8 @@ typedef struct SQueryFilePos { ...@@ -49,9 +49,8 @@ typedef struct SQueryFilePos {
} SQueryFilePos; } SQueryFilePos;
typedef struct SDataBlockLoadInfo { typedef struct SDataBlockLoadInfo {
int32_t fileListIndex; SFileGroup* fileGroup;
int32_t fileId; int32_t slot;
int32_t slotIdx;
int32_t sid; int32_t sid;
SArray *pLoadedCols; SArray *pLoadedCols;
} SDataBlockLoadInfo; } SDataBlockLoadInfo;
...@@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { ...@@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
} }
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) { static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1; pBlockLoadInfo->slot = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->sid = -1; pBlockLoadInfo->sid = -1;
pBlockLoadInfo->fileListIndex = -1; pBlockLoadInfo->fileGroup = NULL;
} }
static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
...@@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { ...@@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->fileListIndex = -1; pCompBlockLoadInfo->fileListIndex = -1;
} }
static int fileOrderComparFn(const void *p1, const void *p2) {
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
if (pInfo1->fileId == pInfo2->fileId) {
return 0;
}
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
}
void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
char suffix[] = ".head";
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
struct dirent *pEntry = NULL;
pVnodeFilesInfo->vnodeId = vnodeId;
char* tsDirectory = "";
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
// strerror(errno));
return;
}
while ((pEntry = readdir(pDir)) != NULL) {
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
continue;
}
if (pEntry->d_type & DT_DIR) {
continue;
}
size_t len = strlen(pEntry->d_name);
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
continue;
}
int32_t vid = 0;
int32_t fid = 0;
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
if (vid != vnodeId) { /* ignore error files */
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
continue;
}
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
// if (fid > pVnode->fileId || fid < firstFid) {
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
// fid, firstFid, pVnode->fileId);
// continue;
// }
assert(fid >= 0 && vid >= 0);
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
}
closedir(pDir);
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
// pVnodeFilesInfo->dbFilePathPrefix);
// order the files information according their names */
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
}
tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
// todo 1. filter not exist table // todo 1. filter not exist table
...@@ -807,6 +735,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -807,6 +735,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
} }
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup;
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
blockLoaded = true; blockLoaded = true;
} }
...@@ -815,6 +748,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -815,6 +748,9 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// failed to load data from disk, abort current query // failed to load data from disk, abort current query
if (blockLoaded == false) { if (blockLoaded == false) {
taosArrayDestroy(sa);
tfree(data);
return false; return false;
} }
...@@ -1001,10 +937,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList ...@@ -1001,10 +937,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
SArray *sa = getDefaultLoadColumns(pHandle, true); SArray *sa = getDefaultLoadColumns(pHandle, true);
doLoadDataFromFileBlock(pHandle); // data block has been loaded, todo extract method
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
return pHandle->pColumns; if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) {
return pHandle->pColumns;
} else {
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
}
} }
} }
} }
...@@ -1361,7 +1303,10 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { ...@@ -1361,7 +1303,10 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter); tSkipListDestroyIter(pTableCheckInfo->iter);
tfree(pTableCheckInfo->pDataCols->buf); if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf);
}
tfree(pTableCheckInfo->pDataCols); tfree(pTableCheckInfo->pDataCols);
tfree(pTableCheckInfo->pCompInfo); tfree(pTableCheckInfo->pCompInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册