提交 88f7e7d7 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/table

......@@ -48,7 +48,7 @@ cp ${compile_dir}/../packaging/deb/taosd ${pkg_dir}${install_home_pat
cp ${compile_dir}/../packaging/tools/post.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/../packaging/tools/preun.sh ${pkg_dir}${install_home_path}/script
cp ${compile_dir}/build/bin/taosdemo ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
#cp ${compile_dir}/build/bin/taosdump ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taosd ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin
cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver
......
......@@ -58,7 +58,7 @@ cp %{_compiledir}/../packaging/tools/preun.sh %{buildroot}%{homepath}/scri
cp %{_compiledir}/build/bin/taos %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosd %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdemo %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
#cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin
cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver
cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include
cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include
......@@ -135,7 +135,7 @@ if [ $1 -eq 0 ];then
${csudo} rm -f ${bin_link_dir}/taos || :
${csudo} rm -f ${bin_link_dir}/taosd || :
${csudo} rm -f ${bin_link_dir}/taosdemo || :
${csudo} rm -f ${bin_link_dir}/taosdump || :
#${csudo} rm -f ${bin_link_dir}/taosdump || :
${csudo} rm -f ${cfg_link_dir}/* || :
${csudo} rm -f ${inc_link_dir}/taos.h || :
${csudo} rm -f ${inc_link_dir}/taoserror.h || :
......
......@@ -45,7 +45,8 @@ if [ "$osType" != "Darwin" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taos ${script_dir}/remove_client.sh"
else
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
#bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${script_dir}/remove_client.sh ${script_dir}/set_core.sh"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
else
......
......@@ -36,7 +36,8 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/taos
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${script_dir}/remove.sh"
else
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
#bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdump ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
bin_files="${build_dir}/bin/taosd ${build_dir}/bin/taos ${build_dir}/bin/taosdemo ${build_dir}/bin/tarbitrator ${script_dir}/remove.sh ${script_dir}/set_core.sh"
fi
lib_files="${build_dir}/lib/libtaos.so.${version}"
......
......@@ -16,6 +16,7 @@
#define _BSD_SOURCE
#define _XOPEN_SOURCE 500
#define _DEFAULT_SOURCE
#define _GNU_SOURCE
#include "os.h"
#include "qAst.h"
......@@ -2037,7 +2038,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
char name[TSDB_COL_NAME_LEN] = {0};
SStrToken t = {.z = pSchema->name, .n = (uint32_t)strnlen(pSchema->name, TSDB_COL_NAME_LEN)};
SStrToken t = {.z = pSchema[i].name, .n = (uint32_t)strnlen(pSchema[i].name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) {
......
......@@ -1718,8 +1718,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
pTableMetaInfo->pTableMeta = (STableMeta *) taosCachePut(tscMetaCache, pTableMetaInfo->name,
strlen(pTableMetaInfo->name), pTableMeta, size, tsTableMetaKeepTimer * 1000);
// todo handle out of memory case
if (pTableMetaInfo->pTableMeta == NULL) {
free(pTableMeta);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -164,6 +164,12 @@ typedef struct STsdbQueryCond {
SColumnInfo *colList;
} STsdbQueryCond;
typedef struct SMemRef {
int32_t ref;
void *mem;
void *imem;
} SMemRef;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
......@@ -193,7 +199,7 @@ typedef struct {
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo);
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo, SMemRef* pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
......@@ -205,7 +211,7 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo);
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo, SMemRef* pRef);
/**
* get the queried table object list
......@@ -223,7 +229,7 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo);
void *qinfo, SMemRef* pRef);
/**
* move to next block if exists
......
......@@ -793,9 +793,9 @@ int main(int argc, char *argv[]) {
(ntables * nrecords_per_table) / (t * nrecords_per_request),
t * 1000);
printf("Spent %.4f seconds to insert %d records with %d record(s) per request: %.2f records/second\n",
t, ntables * nrecords_per_table, nrecords_per_request,
ntables * nrecords_per_table / t);
printf("Spent %.4f seconds to insert %lld records with %d record(s) per request: %.2f records/second\n",
t, (long long int)ntables * nrecords_per_table, nrecords_per_request,
((long long int)ntables * nrecords_per_table) / t);
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
......
......@@ -200,12 +200,6 @@ enum {
QUERY_RESULT_READY = 2,
};
typedef struct SMemRef {
int32_t ref;
void *mem;
void *imem;
} SMemRef;
typedef struct SQInfo {
void* signature;
int32_t code; // error code to returned to client
......
......@@ -1711,6 +1711,19 @@ _clean:
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
static void doFreeQueryHandle(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv->pQueryHandle = NULL;
pRuntimeEnv->pSecQueryHandle = NULL;
SMemRef* pMemRef = &pQInfo->memRef;
assert(pMemRef->ref == 0 && pMemRef->imem == NULL && pMemRef->mem == NULL);
}
static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
if (pRuntimeEnv->pQuery == NULL) {
return;
......@@ -1740,8 +1753,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pFillInfo = taosDestroyFillInfo(pRuntimeEnv->pFillInfo);
destroyResultBuf(pRuntimeEnv->pResultBuf);
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
doFreeQueryHandle(pQInfo);
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
taosTFree(pRuntimeEnv->keyBuf);
......@@ -3536,7 +3548,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
}
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
......@@ -3620,7 +3632,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
}
restoreTimeWindow(&pQInfo->tableGroupInfo, &cond);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
......@@ -4462,7 +4474,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
// update the query time window
pQuery->window = cond.twindow;
......@@ -4484,9 +4496,9 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
}
}
} else if (isPointInterpoQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
} else {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
}
return terrno;
......@@ -4765,7 +4777,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
if (pRuntimeEnv->pQueryHandle == NULL) {
......@@ -4880,7 +4892,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pRuntimeEnv->pQueryHandle = NULL;
}
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
taosArrayDestroy(tx);
taosArrayDestroy(g1);
......@@ -4946,7 +4958,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
// no need to update the lastkey for each table
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo, &pQInfo->memRef);
taosArrayDestroy(g1);
taosArrayDestroy(tx);
......@@ -5155,7 +5167,7 @@ static void doSaveContext(SQInfo *pQInfo) {
setupQueryRangeForReverseScan(pQInfo);
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef);
if (pRuntimeEnv->pSecQueryHandle == NULL) {
longjmp(pRuntimeEnv->env, terrno);
}
......@@ -6841,7 +6853,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
int32_t code = TSDB_CODE_SUCCESS;
#if 0
#if _NON_BLOCKING_RETRIEVE
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pthread_mutex_lock(&pQInfo->lock);
......@@ -6913,6 +6925,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
doFreeQueryHandle(pQInfo);
*continueExec = false;
(*pRsp)->completed = 1; // notify no more result to client
} else {
......
......@@ -198,6 +198,8 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
if (pIMem != NULL) {
tsdbUnRefMemTable(pRepo, pIMem);
}
tsdbDebug("vgId:%d utake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
}
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
......
......@@ -20,7 +20,6 @@
#include "exception.h"
#include "../../query/inc/qAst.h" // todo move to common module
#include "../../query/inc/qExecutor.h" // todo move to common module
#include "tlosertree.h"
#include "tsdb.h"
#include "tsdbMain.h"
......@@ -120,8 +119,9 @@ typedef struct STsdbQueryHandle {
SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
SMemTable *mem; // mem-table
SMemTable *imem; // imem-table, acquired from snapshot
SMemRef *pMemRef;
// SMemTable *mem; // mem-table
// SMemTable *imem; // imem-table, acquired from snapshot
SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
......@@ -184,26 +184,26 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS
return pLocalIdList;
}
static void tsdbMayTakeMemSnapshot(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pSecQueryHandle = (STsdbQueryHandle*) pHandle;
SQInfo *pQInfo = (SQInfo *)(pSecQueryHandle->qinfo);
static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL);
if (pQInfo->memRef.ref++ == 0) {
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
pQInfo->memRef.mem = pSecQueryHandle->mem;
pQInfo->memRef.imem = pSecQueryHandle->imem;
} else {
pSecQueryHandle->mem = (SMemTable *)(pQInfo->memRef.mem);
pSecQueryHandle->imem = (SMemTable *)(pQInfo->memRef.imem);
SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pQueryHandle->pMemRef->ref++ == 0) {
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, (SMemTable**)&(pMemRef->mem), (SMemTable**)&(pMemRef->imem));
}
}
static void tsdbMayUnTakeMemSnapshot(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pSecQueryHandle = (STsdbQueryHandle*) pHandle;
SQInfo *pQInfo = (SQInfo *)(pSecQueryHandle->qinfo);
if (--pQInfo->memRef.ref == 0) {
tsdbUnTakeMemSnapShot(pSecQueryHandle->pTsdb, pSecQueryHandle->mem, pSecQueryHandle->imem);
static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL);
SMemRef* pMemRef = pQueryHandle->pMemRef;
if (--pMemRef->ref == 0) {
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem);
pMemRef->mem = NULL;
pMemRef->imem = NULL;
}
pQueryHandle->pMemRef = NULL;
}
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
......@@ -270,7 +270,7 @@ static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey)
return pNew;
}
static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo) {
static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo, SMemRef* pMemRef) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pQueryHandle == NULL) {
goto out_of_memory;
......@@ -288,13 +288,14 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef;
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
goto out_of_memory;
}
tsdbMayTakeMemSnapshot(pQueryHandle);
assert(pCond != NULL && pCond->numOfCols > 0);
assert(pCond != NULL && pCond->numOfCols > 0 && pMemRef != NULL);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
......@@ -348,8 +349,8 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond*
return NULL;
}
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo);
TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo, SMemRef* pRef) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo, pRef);
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
......@@ -366,7 +367,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
return (TsdbQueryHandleT) pQueryHandle;
}
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pMemRef) {
pCond->twindow = changeTableGroupByLastrow(groupList);
// no qualified table
......@@ -374,7 +375,7 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
return NULL;
}
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pMemRef);
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
return pQueryHandle;
......@@ -396,8 +397,8 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
return res;
}
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pRef);
if (pQueryHandle != NULL) {
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
changeQueryHandleForInterpQuery(pQueryHandle);
......@@ -417,7 +418,7 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
int32_t order = pHandle->order;
// no data in buffer, abort
if (pHandle->mem == NULL && pHandle->imem == NULL) {
if (pHandle->pMemRef->mem == NULL && pHandle->pMemRef->imem == NULL) {
return false;
}
......@@ -426,16 +427,19 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
STableData* pMem = NULL;
STableData* pIMem = NULL;
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables) {
pMem = pHandle->mem->tData[pCheckInfo->tableId.tid];
SMemTable* pMemT = pHandle->pMemRef->mem;
SMemTable* pIMemT = pHandle->pMemRef->imem;
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid];
if (pMem != NULL && pMem->uid == pCheckInfo->tableId.uid) { // check uid
pCheckInfo->iter =
tSkipListCreateIterFromVal(pMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
}
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables) {
pIMem = pHandle->imem->tData[pCheckInfo->tableId.tid];
if (pIMemT && pCheckInfo->tableId.tid < pIMemT->maxTables) {
pIMem = pIMemT->tData[pCheckInfo->tableId.tid];
if (pIMem != NULL && pIMem->uid == pCheckInfo->tableId.uid) { // check uid
pCheckInfo->iiter =
tSkipListCreateIterFromVal(pIMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
......@@ -2029,7 +2033,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
}
STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo);
STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef);
taosTFree(cond.colList);
......
......@@ -2043,6 +2043,7 @@ class ThreadStacks: # stack info for all threads
print("[{sf}] File {filename}, line {lineno}, in {name}".format(
sf=stackFrame, filename=frame.filename, lineno=frame.lineno, name=frame.name))
print(" {}".format(frame.line))
stackFrame += 1
print("-----> End of Thread Info ----->\n")
class ClientManager:
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
import os
import threading
import time
class TDTestCase:
"""
kill query
"""
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def query(self):
conn = taos.connect(host='127.0.0.1', user='root', password='taosdata', config='/etc/config')
cursor = conn.cursor()
while True:
cursor.execute('show queries;')
print('show queries!')
temp = cursor.fetchall()
if temp:
print(temp[0][0])
cursor.execute('kill query %s ;' % temp[0][0])
print('kill query success')
break
time.sleep(0.5)
cursor.close()
conn.close()
def run(self):
tdSql.prepare()
print("==============step1")
os.system('yes | sudo taosdemo -n 100')
print('insert into test.meters 10000000 rows')
t1 = threading.Thread(target=self.query)
t1.setDaemon(True)
t1.start()
print("==============step2")
tdSql.execute('use test;')
try:
print('============begin select * from 10000000 rows')
tdSql.query('select * from test.meters;')
# print(tdSql.queryResult)
except Exception as e:
if not "ProgrammingError('Query terminated'" in str(e):
raise Exception('fail')
print('success')
print('kill query success')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册