diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9657912716fe019b8d92abd4fc4232cf3a133301..dad32e58da1dcea4b6850f2aaeca50e538e1c14d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -213,8 +213,6 @@ typedef struct SDataBlockList { int32_t idx; uint32_t nSize; uint32_t nAlloc; - char * userParam; /* user assigned parameters for async query */ - void * udfp; /* user defined function pointer, used in async model */ STableDataBlocks **pData; } SDataBlockList; @@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); -void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index ee2143ff6d1d8c2ba2dddcb173506a27f4603530..d1596ca99afeb4ef0a5c0504d91e4c4ecd3a0eb9 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -77,7 +77,7 @@ int32_t main(int32_t argc, char *argv[]) { } /* Set termination handler. */ - struct sigaction act; + struct sigaction act = {0}; act.sa_flags = SA_SIGINFO; act.sa_sigaction = signal_handler; sigaction(SIGTERM, &act, NULL); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 682aee4c0b3f613d920b29a8add684604b1b9347..f71729c87a1c3a73cec1ae1df744c0046c73c6c3 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -290,8 +290,9 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { if (qHasMoreResultsToRetrieve(pQInfo)) { dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); } else { // no further execution invoked, release the ref to vnode - dnodeProcessReadResult(pVnode, pMsg); - //vnodeRelease(pVnode); + qDestroyQueryInfo(pQInfo); +// dnodeProcessReadResult(pVnode, pMsg); + vnodeRelease(pVnode); } } @@ -305,5 +306,4 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { rpcSendResponse(&rpcRsp); dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); - vnodeRelease(pVnode); } diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 5adce04efa44d7e85bc9ccf482dd21e0deff20b5..8c8c1a3a12f2392441abdd4ca673b45641cf1e22 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -198,6 +198,12 @@ typedef struct SQInfo { */ int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +/** + * destroy the query info struct + * @param pQInfo + */ +void qDestroyQueryInfo(SQInfo* pQInfo); + /** * query on single table * @param pReadMsg diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 4c4014996e9a9dda4356b8d0aa1e593a8d70e702..97bfe488ae98b3970f86cb80ea6913011ca61b82 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1566,6 +1566,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } destroyResultBuf(pRuntimeEnv->pResultBuf); + tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -2244,11 +2245,8 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { } SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - // tSidSetDestroy(&pQInfo->pSidSet); - if (pQInfo->pTableDataInfo != NULL) { // size_t num = taosHashGetSize(pQInfo->pTableIdList); for (int32_t j = 0; j < 0; ++j) { @@ -4202,8 +4200,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable } pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); + taosArrayDestroy(cols); + pRuntimeEnv->pQuery = pQuery; - pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vnodeIndex = -1; if (param != NULL) { @@ -5444,13 +5443,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, *tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); } - - dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " - "timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 - ", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64, + + dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, " + "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, - pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType, - pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols, + pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); @@ -5974,6 +5971,8 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); + taosArrayDestroy(pQInfo->pTableIdList); + dTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check @@ -6120,6 +6119,10 @@ _query_over: return TSDB_CODE_SUCCESS; } +void qDestroyQueryInfo(SQInfo* pQInfo) { + freeQInfo(pQInfo); +} + void qTableQuery(SQInfo *pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) { dTrace("%p freed abort query", pQInfo); @@ -6133,6 +6136,9 @@ void qTableQuery(SQInfo *pQInfo) { dTrace("QInfo:%p query task is launched", pQInfo); +// sem_post(&pQInfo->dataReady); +// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER; + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); if (numOfTables == 1) { singleTableQueryImpl(pQInfo); @@ -6212,7 +6218,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client - freeQInfo(pQInfo); } return code; diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 21818e572f3fc49a2841c2f494362e2a7103f9f0..52aa07d968dd66af6267fe35fb9fa101fbba3621 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) va_start(argpointer, format); int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer); if (writeLen <= 0) { - char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE]; + char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0}; writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer); strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE); len += MAX_LOGLINE_CONTENT_SIZE; diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index b20711df1dc4b4a515dad83105bc9977588f5483..b2673413ae9eb66247c9f4ab7c68baac694fcbfb 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -337,6 +337,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); */ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); +/** + * clean up the query handle + * @param queryHandle + */ +void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle); + #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 30d0e949507b997fa1cbee4cfcac338fe3c16f7e..dd69ae77f094b14b43d56ea95949af5915b878d8 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -282,7 +282,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; - pQueryHandle->pColumns = pColumnInfo; pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; @@ -331,9 +330,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); - int32_t vnodeId = 1; - vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo); - return (tsdb_query_handle_t)pQueryHandle; } @@ -468,6 +464,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { } } + taosArrayDestroy(sa); return pQueryHandle->realNumOfRows > 0; } @@ -751,7 +748,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf int32_t tid = pCheckInfo->tableId.tid; while (pCheckInfo->pFileGroup != NULL) { - if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { + if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) { break; } @@ -761,7 +758,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pCheckInfo->pFileGroup->fileId, tid); pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); - continue; } @@ -790,7 +786,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks); // load first data block into memory failed, caused by disk block error - bool blockLoaded = false; + bool blockLoaded = false; SArray *sa = getDefaultLoadColumns(pQueryHandle, true); // todo no need to loaded at all @@ -810,8 +806,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pFile->fd = open(pFile->fname, O_RDONLY); } - if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, - pCheckInfo->pDataCols, data) == 0) { + if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { blockLoaded = true; } @@ -825,12 +820,19 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf // todo search qualified points in blk, according to primary key (timestamp) column SDataCols* pDataCols = pCheckInfo->pDataCols; + + TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; + assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); + cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order); cur->fid = pCheckInfo->pFileGroup->fileId; assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); + + taosArrayDestroy(sa); + tfree(data); return pQueryHandle->realNumOfRows > 0; } @@ -838,8 +840,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); -// SQueryFilePos* cur = &pHandle->cur; - STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); if (!pCheckInfo->checkFirstFileBlock) { @@ -1351,3 +1351,34 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo return result; } } + +void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle; + + + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + for(int32_t i = 0; i < size; ++i) { + STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + tSkipListDestroyIter(pTableCheckInfo->iter); + + tfree(pTableCheckInfo->pDataCols->buf); + tfree(pTableCheckInfo->pDataCols); + + tfree(pTableCheckInfo->pCompInfo); + tfree(pTableCheckInfo->compIndex); + } + + taosArrayDestroy(pQueryHandle->pTableCheckInfo); + + size_t cols = taosArrayGetSize(pQueryHandle->pColumns); + for(int32_t i = 0; i < cols; ++i) { + SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + tfree(pColInfo->pData); + } + + taosArrayDestroy(pQueryHandle->pColumns); + + tfree(pQueryHandle->unzipBuffer); + tfree(pQueryHandle->secondaryUnzipBuffer); + tfree(pQueryHandle); +} diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 853a949162e10ae64677734d77ad7ffeec2eb3bb..d92eabab496888c51f9bb54b2391f4f988d82bfa 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -24,17 +24,43 @@ void taosMsleep(int mseconds); +static int32_t doQuery(TAOS* taos, const char* sql) { + int32_t code = taos_query(taos, sql); + if (code != 0) { + printf("failed to execute query, reason:%s\n", taos_errstr(taos)); + return -1; + } + + TAOS_RES* res = taos_use_result(taos); + TAOS_ROW row = NULL; + char buf[512] = {0}; + + int32_t numOfFields = taos_num_fields(res); + TAOS_FIELD* pFields = taos_fetch_fields(res); + + while((row = taos_fetch_row(res)) != NULL) { + taos_print_row(buf, row, pFields, numOfFields); + printf("%s\n", buf); + memset(buf, 0, 512); + } + + taos_free_result(res); +} + int main(int argc, char *argv[]) { TAOS * taos; char qstr[1024]; TAOS_RES *result; + // connect to server if (argc < 2) { printf("please input server-ip \n"); return 0; } + taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg"); + // init TAOS taos_init(); @@ -45,6 +71,22 @@ int main(int argc, char *argv[]) { } printf("success to connect to server\n"); + doQuery(taos, "create database if not exists test"); + doQuery(taos, "use test"); + doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); + doQuery(taos, "select * from tm0;"); + + taos_close(taos); + return 0; taos_query(taos, "drop database demo"); if (taos_query(taos, "create database demo") != 0) { @@ -53,8 +95,10 @@ int main(int argc, char *argv[]) { } printf("success to create database\n"); + taos_query(taos, "use demo"); + // create table if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) { printf("failed to create table, reason:%s\n", taos_errstr(taos)); @@ -62,9 +106,11 @@ int main(int argc, char *argv[]) { } printf("success to create table\n"); + // sleep for one second to make sure table is created on data node // taosMsleep(1000); + // insert 10 records int i = 0; for (i = 0; i < 10; ++i) { @@ -76,6 +122,7 @@ int main(int argc, char *argv[]) { } printf("success to insert rows, total %d rows\n", i); + // query the records sprintf(qstr, "SELECT * FROM m1"); if (taos_query(taos, qstr) != 0) { @@ -83,12 +130,16 @@ int main(int argc, char *argv[]) { exit(1); } + result = taos_use_result(taos); + if (result == NULL) { printf("failed to get result, reason:%s\n", taos_errstr(taos)); exit(1); } + +// TAOS_ROW row; TAOS_ROW row; int rows = 0; @@ -96,6 +147,7 @@ int main(int argc, char *argv[]) { TAOS_FIELD *fields = taos_fetch_fields(result); char temp[256]; + printf("select * from table, result:\n"); // fetch the records row by row while ((row = taos_fetch_row(result))) {