提交 eb520a11 编写于 作者: A Alex Duan

Merge branch 'main' into feat/TD-22067-MAIN

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e04f39b GIT_TAG 74c0357
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -99,6 +99,38 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p ...@@ -99,6 +99,38 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* idstr) {
int32_t numOfTables = p->numOfTables;
if (suid != 0) {
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, suid, -1, 1);
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
} else {
for (int32_t i = 0; i < numOfTables; ++i) {
uint64_t uid = p->pTableList[i].uid;
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, uid, -1, 1);
if (p->pSchema != NULL) {
break;
}
tsdbWarn("table:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", uid, idstr);
}
// all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader, const char* idstr) { uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
...@@ -117,11 +149,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -117,11 +149,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0];
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1);
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->numOfTables = numOfTables; p->numOfTables = numOfTables;
int32_t code = setTableSchema(p, suid, idstr);
if (code != TSDB_CODE_SUCCESS) {
tsdbCacherowsReaderClose(p);
return code;
}
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) { if (p->transferBuf == NULL) {
tsdbCacherowsReaderClose(p); tsdbCacherowsReaderClose(p);
......
...@@ -423,19 +423,6 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) { ...@@ -423,19 +423,6 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
return win; return win;
} }
static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
int32_t rowLen = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes;
}
// make sure the output SSDataBlock size be less than 2MB.
const int32_t TWOMB = 2 * 1024 * 1024;
if ((*capacity) * rowLen > TWOMB) {
(*capacity) = TWOMB / rowLen;
}
}
// init file iterator // init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) { static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader) {
size_t numOfFileset = taosArrayGetSize(aDFileSet); size_t numOfFileset = taosArrayGetSize(aDFileSet);
...@@ -618,9 +605,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -618,9 +605,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end; goto _end;
} }
// todo refactor.
limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg)); pSup->pColAgg = taosArrayInit(pCond->numOfCols, sizeof(SColumnDataAgg));
...@@ -3835,11 +3819,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL ...@@ -3835,11 +3819,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pCond->twindows.ekey -= 1; pCond->twindows.ekey -= 1;
} }
int32_t capacity = 0; int32_t capacity = pVnode->config.tsdbCfg.maxRows;
if (pResBlock == NULL) { if (pResBlock != NULL) {
capacity = 4096; blockDataEnsureCapacity(pResBlock, capacity);
} else {
capacity = pResBlock->info.capacity;
} }
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
......
...@@ -215,8 +215,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -215,8 +215,15 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);
continue;
}
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......
...@@ -781,6 +781,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -781,6 +781,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
}
} }
SSDataBlock* result = doGroupedTableScan(pOperator); SSDataBlock* result = doGroupedTableScan(pOperator);
...@@ -884,7 +888,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -884,7 +888,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); // blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -44,7 +44,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -44,7 +44,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pMeta->pTasks == NULL) { if (pMeta->pTasks == NULL) {
goto _err; goto _err;
} }
......
...@@ -420,7 +420,13 @@ static void transHttpEnvInit() { ...@@ -420,7 +420,13 @@ static void transHttpEnvInit() {
uv_loop_init(http->loop); uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (NULL == http->asyncPool) {
taosMemoryFree(http->loop);
taosMemoryFree(http);
http = NULL;
return;
}
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) { if (err != 0) {
taosMemoryFree(http->loop); taosMemoryFree(http->loop);
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "taoserror.h" #include "taoserror.h"
#define PROCESS_ITEM 12 #define PROCESS_ITEM 12
#define UUIDLEN37 37
typedef struct { typedef struct {
uint64_t user; uint64_t user;
...@@ -830,7 +831,8 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { ...@@ -830,7 +831,8 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
return 0; return 0;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
uuid_t uuid = {0}; uuid_t uuid = {0};
char buf[37] = {0}; char buf[UUIDLEN37];
memset(buf, 0, UUIDLEN37);
uuid_generate(uuid); uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, buf); uuid_unparse_lower(uuid, buf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册