diff --git a/examples/JDBC/readme.md b/examples/JDBC/readme.md index c7d7875308d248c1abef8d47bc69a69e91374dbb..c5588a5b255970b1e509abc5b0a8410dcbf52a43 100644 --- a/examples/JDBC/readme.md +++ b/examples/JDBC/readme.md @@ -10,4 +10,4 @@ | 6 | taosdemo | This is an internal tool for testing Our JDBC-JNI, JDBC-RESTful, RESTful interfaces | -more detail: https://docs.taosdata.com/reference/connector/java/ \ No newline at end of file +more detail: https://docs.taosdata.com/connector/java/ diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 21d299f28d643bb7fdad2a017870ce52b3516d69..d8eecdfc64a7ca348111872edc9ba9f9ee5e4913 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -111,6 +111,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) +#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) // #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) // diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 9a3838a6b42a492b4242d1077b54de71856c2b53..8c91eacf1eefcdbfffb9ce72a7dec80a86e92e3e 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -25,6 +25,10 @@ #include "tref.h" #include "ttimer.h" +static tb_uid_t processSuid(tb_uid_t suid, char* db){ + return suid + MurmurHash3_32(db, strlen(db)); +} + static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t) { char* string = NULL; @@ -681,7 +685,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.numOfColumns = req.schemaRow.nCols; pReq.numOfTags = req.schemaTag.nCols; pReq.commentLen = -1; - pReq.suid = req.suid; + pReq.suid = processSuid(req.suid, pRequest->pDb); pReq.source = TD_REQ_FROM_TAOX; pReq.igExists = true; @@ -753,7 +757,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { // build drop stable pReq.igNotExists = true; pReq.source = TD_REQ_FROM_TAOX; - pReq.suid = req.suid; + pReq.suid = processSuid(req.suid, pRequest->pDb); STscObj* pTscObj = pRequest->pTscObj; SName tableName = {0}; @@ -871,6 +875,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { if (pCreateReq->type == TSDB_CHILD_TABLE) { STableMeta* pTableMeta = NULL; SName sName = {0}; + pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb); toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); if (code != TSDB_CODE_SUCCESS) { @@ -1008,6 +1013,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pDropReq = req.pReqs + iReq; pDropReq->igNotExists = true; + pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb); SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1922,6 +1928,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; + void* schemaContent = NULL; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); @@ -2008,27 +2015,49 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SDecoder decoderTmp = {0}; SVCreateTbReq pCreateReq = {0}; - tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { - tDecoderClear(&decoderTmp); - taosMemoryFreeClear(pCreateReq.comment); - taosArrayDestroy(pCreateReq.ctb.tagName); - goto end; - } + do{ + tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); + if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { + code = TSDB_CODE_MSG_DECODE_ERROR; + break; + } + + if (strcmp(tbName, pCreateReq.name) != 0) { + break; + } + + pCreateReq.ctb.suid = processSuid(pCreateReq.ctb.suid, pRequest->pDb); - ASSERT(pCreateReq.type == TSDB_CHILD_TABLE); - if (strcmp(tbName, pCreateReq.name) == 0) { - schemaLen = *lenTmp; - schemaData = *dataTmp; + int32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, &pCreateReq, len, code); + if(code != 0) { + code = TSDB_CODE_MSG_ENCODE_ERROR; + break; + } + taosMemoryFree(schemaContent); + schemaContent = taosMemoryMalloc(len); + if(!schemaContent) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + SEncoder encoder = {0}; + tEncoderInit(&encoder, schemaContent, len); + code = tEncodeSVCreateTbReq(&encoder, &pCreateReq); + if (code != 0) { + tEncoderClear(&encoder); + code = TSDB_CODE_MSG_ENCODE_ERROR; + break; + } + schemaLen = len; + schemaData = schemaContent; strcpy(pName.tname, pCreateReq.ctb.stbName); - tDecoderClear(&decoderTmp); - taosMemoryFreeClear(pCreateReq.comment); - taosArrayDestroy(pCreateReq.ctb.tagName); - break; - } + tEncoderClear(&encoder); + }while(0); tDecoderClear(&decoderTmp); taosMemoryFreeClear(pCreateReq.comment); taosArrayDestroy(pCreateReq.ctb.tagName); + if(code != 0) goto end; + if(schemaLen != 0) break; } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); @@ -2217,6 +2246,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); + taosMemoryFree(schemaContent); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 53e5cd7a8884208c2e59733b17a0e9572de42348..e9bb7e3d09bc669d531d7df83fcb6902f2805bbd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -423,19 +423,6 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { return win; } -static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) { - int32_t rowLen = 0; - for (int32_t i = 0; i < pCond->numOfCols; ++i) { - rowLen += pCond->colList[i].bytes; - } - - // make sure the output SSDataBlock size be less than 2MB. - const int32_t TWOMB = 2 * 1024 * 1024; - if ((*capacity) * rowLen > TWOMB) { - (*capacity) = TWOMB / rowLen; - } -} - // init file iterator static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { size_t numOfFileset = taosArrayGetSize(aDFileSet); @@ -618,9 +605,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd goto _end; } - // todo refactor. - limitOutputBufferSize(pCond, &pReader->capacity); - // allocate buffer in order to load data blocks from file SBlockLoadSuppInfo* pSup = &pReader->suppInfo; pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg)); @@ -1611,9 +1595,9 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 - " - %" PRId64 " %s", + " - %" PRId64 ", uid:%"PRIu64", %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, - pReader->idStr); + pBlockScanInfo->uid, pReader->idStr); pReader->cost.buildmemBlock += elapsedTime; return code; @@ -1639,8 +1623,10 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return false; } -static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, +static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, SVersionRange* pVerRange) { + int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1; + while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); if (!hasVal) { @@ -1649,8 +1635,15 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBKEY k = TSDBROW_KEY(&row); - if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, - pVerRange)) { + if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) { + pScanInfo->lastKey = k.ts; + } else { + // the qualifed ts may equal to k.ts, only a greater version one. + // here we need to fallback one step. + if (pScanInfo->lastKey == k.ts) { + pScanInfo->lastKey -= step; + } + return true; } } @@ -2316,6 +2309,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan w.ekey = pScanInfo->lastKey + step; } + tsdbDebug("init last block reader, window:%"PRId64"-%"PRId64", uid:%"PRIu64", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pLBlockReader->pInfo, false, pReader->idStr); @@ -2755,18 +2749,6 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc); } -// reset the last del file index -static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) { - void* p = taosHashIterate(pStatus->pTableMap, NULL); - while (p != NULL) { - STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p; - - // reset the last del file index - pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order); - p = taosHashIterate(pStatus->pTableMap, p); - } -} - static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; @@ -3082,7 +3064,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have data files, let's start check the last block file if exists if (pBlockIter->numOfBlocks == 0) { - resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } } @@ -3114,7 +3095,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // data blocks in current file are exhausted, let's try the next file now tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order); - resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } else { code = initForFirstBlockInFile(pReader, pBlockIter); @@ -3126,7 +3106,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // this file does not have blocks, let's start check the last block file if (pBlockIter->numOfBlocks == 0) { - resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order); goto _begin; } } @@ -3840,11 +3819,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL pCond->twindows.ekey -= 1; } - int32_t capacity = 0; - if (pResBlock == NULL) { - capacity = 4096; - } else { - capacity = pResBlock->info.capacity; + int32_t capacity = pVnode->config.tsdbCfg.maxRows; + if (pResBlock != NULL) { + blockDataEnsureCapacity(pResBlock, capacity); } int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f90cc6699b61f0edaf459169421fa902f589ada5..29fe9817c915371367d45dc7ef561386aab52d6a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -781,6 +781,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } + + if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { + pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; + } } SSDataBlock* result = doGroupedTableScan(pOperator); @@ -884,7 +888,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); - blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); +// blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2f991288ffd0201be79ed3392befcd5da669294e..184c1c8abf1bb240b0a1da0ab436ddfb0e03f27e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -44,7 +44,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (pMeta->pTasks == NULL) { goto _err; } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index cd508f6fe9e71ef90fc5c156685e95f5972931e3..8e5f79137fd5f693e012849ce468fffed2870fa9 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -420,7 +420,13 @@ static void transHttpEnvInit() { uv_loop_init(http->loop); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - + if (NULL == http->asyncPool) { + taosMemoryFree(http->loop); + taosMemoryFree(http); + http = NULL; + return; + } + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { taosMemoryFree(http->loop); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index aeaa4fcafd1d2bd609b134368beee1c0c656227b..897e68a12687e3ae713f685a2898a4061bb78f81 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -18,6 +18,7 @@ #include "taoserror.h" #define PROCESS_ITEM 12 +#define UUIDLEN37 37 typedef struct { uint64_t user; @@ -830,7 +831,8 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { return 0; #elif defined(_TD_DARWIN_64) uuid_t uuid = {0}; - char buf[37] = {0}; + char buf[UUIDLEN37]; + memset(buf, 0, UUIDLEN37); uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null uuid_unparse_lower(uuid, buf); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d610d26f97317fec227f6c7394ff9639cf3aab69..57b1998155bf24221eb2e57dbcdb7d8312f8f9eb 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -88,6 +88,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") +TAOS_DEFINE_ERROR(TSDB_CODE_MSG_ENCODE_ERROR, "Msg encode error") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 6c71c5cea7a3ea67e21a520933d66b3508607b45..a1682f47b3e229e24a9b7bdd750b8cbe4890dea5 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -29,6 +29,7 @@ class TDSimClient: self.testCluster = False self.path = path self.cfgDict = { + "fqdn": "localhost", "numOfLogLines": "100000000", "locale": "en_US.UTF-8", "charset": "UTF-8", @@ -119,6 +120,7 @@ class TDDnode: self.asan = False self.remoteIP = "" self.cfgDict = { + "fqdn": "localhost", "monitor": "0", "maxShellConns": "30000", "locale": "en_US.UTF-8", diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index f2fbd8486570e385332be5280f58f25af7ae4d75..0596241ce1f6c892313ccca219c131476246770c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -194,6 +194,11 @@ class TDTestCase: tdSql.checkData(0, 2, None) tdSql.checkData(1, 1, 1) tdSql.checkData(1, 2, '{"k1":1,"k2":"hello"}') + + tdSql.query("select * from information_schema.ins_tables where table_name = 'stt4'") + uid1 = tdSql.getData(0, 5) + uid2 = tdSql.getData(1, 5) + tdSql.checkNotEqual(uid1, uid2) return def checkWal1Vgroup(self):