未验证 提交 ae2739b1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #7557 from taosdata/feature/m1

Feature/m1
...@@ -841,6 +841,11 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) { ...@@ -841,6 +841,11 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql) {
tableSerialize = totalTables * sizeof(STableIdInfo); tableSerialize = totalTables * sizeof(STableIdInfo);
} }
SCond* pCond = &pQueryInfo->tagCond.tbnameCond;
if (pCond->len > 0) {
srcColListSize += pCond->len;
}
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize + return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + srcColFilterSize + srcTagFilterSize +
exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen; exprSize + tsBufSize + tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
} }
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
#define _GNU_SOURCE #define _GNU_SOURCE
#include "os.h" #include "os.h"
#include "texpr.h" #include "texpr.h"
#include "tsched.h"
#include "qTsbuf.h" #include "qTsbuf.h"
#include "tcompare.h" #include "tcompare.h"
#include "tscLog.h" #include "tscLog.h"
...@@ -2425,6 +2426,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2425,6 +2426,26 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
return terrno; return terrno;
} }
typedef struct SPair {
int32_t first;
int32_t second;
} SPair;
static void doSendQueryReqs(SSchedMsg* pSchedMsg) {
SSqlObj* pSql = pSchedMsg->ahandle;
SPair* p = pSchedMsg->msg;
for(int32_t i = p->first; i < p->second; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex);
tscBuildAndSendRequest(pSub, NULL);
}
tfree(p);
}
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -2547,13 +2568,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -2547,13 +2568,33 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
doCleanupSubqueries(pSql, i); doCleanupSubqueries(pSql, i);
return pRes->code; return pRes->code;
} }
for(int32_t j = 0; j < pState->numOfSub; ++j) { // concurrently sent the query requests.
SSqlObj* pSub = pSql->pSubs[j]; const int32_t MAX_REQUEST_PER_TASK = 8;
SRetrieveSupport* pSupport = pSub->param;
int32_t numOfTasks = (pState->numOfSub + MAX_REQUEST_PER_TASK - 1)/MAX_REQUEST_PER_TASK;
tscDebug("0x%"PRIx64" sub:0x%"PRIx64" launch subquery, orderOfSub:%d.", pSql->self, pSub->self, pSupport->subqueryIndex); assert(numOfTasks >= 1);
tscBuildAndSendRequest(pSub, NULL);
int32_t num = (pState->numOfSub/numOfTasks) + 1;
tscDebug("0x%"PRIx64 " query will be sent by %d threads", pSql->self, numOfTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doSendQueryReqs;
schedMsg.ahandle = (void*)pSql;
schedMsg.thandle = NULL;
SPair* p = calloc(1, sizeof(SPair));
p->first = j * num;
if (j == numOfTasks - 1) {
p->second = pState->numOfSub;
} else {
p->second = (j + 1) * num;
}
schedMsg.msg = p;
taosScheduleTask(tscQhandle, &schedMsg);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -2,7 +2,7 @@ import taos ...@@ -2,7 +2,7 @@ import taos
conn = taos.connect(host='127.0.0.1', conn = taos.connect(host='127.0.0.1',
user='root', user='root',
passworkd='taodata', password='taosdata',
database='log') database='log')
cursor = conn.cursor() cursor = conn.cursor()
......
...@@ -50,14 +50,20 @@ void osInit() { ...@@ -50,14 +50,20 @@ void osInit() {
char* taosGetCmdlineByPID(int pid) { char* taosGetCmdlineByPID(int pid) {
static char cmdline[1024]; static char cmdline[1024];
sprintf(cmdline, "/proc/%d/cmdline", pid); sprintf(cmdline, "/proc/%d/cmdline", pid);
FILE* f = fopen(cmdline, "r");
if (f) { int fd = open(cmdline, O_RDONLY);
size_t size; if (fd >= 0) {
size = fread(cmdline, sizeof(char), 1024, f); int n = read(fd, cmdline, sizeof(cmdline) - 1);
if (size > 0) { if (n < 0) n = 0;
if ('\n' == cmdline[size - 1]) cmdline[size - 1] = '\0';
} if (n > 0 && cmdline[n - 1] == '\n') --n;
fclose(f);
cmdline[n] = 0;
close(fd);
} else {
cmdline[0] = 0;
} }
return cmdline; return cmdline;
} }
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }} #define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
#define SQL_LENGTH 1030 #define SQL_LENGTH 1030
#define LOG_LEN_STR 100 #define LOG_LEN_STR 512
#define IP_LEN_STR TSDB_EP_LEN #define IP_LEN_STR TSDB_EP_LEN
#define CHECK_INTERVAL 1000 #define CHECK_INTERVAL 1000
......
...@@ -4089,7 +4089,7 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD ...@@ -4089,7 +4089,7 @@ static void mergeTableBlockDist(SResultRowCellInfo* pResInfo, const STableBlockD
} else { } else {
pDist->maxRows = pSrc->maxRows; pDist->maxRows = pSrc->maxRows;
pDist->minRows = pSrc->minRows; pDist->minRows = pSrc->minRows;
int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS; int32_t maxSteps = TSDB_MAX_MAX_ROW_FBLOCK/TSDB_BLOCK_DIST_STEP_ROWS;
if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) { if (TSDB_MAX_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++maxSteps; ++maxSteps;
...@@ -4223,7 +4223,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { ...@@ -4223,7 +4223,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
taosArrayDestroy(pDist->dataBlockInfos); taosArrayDestroy(pDist->dataBlockInfos);
pDist->dataBlockInfos = NULL; pDist->dataBlockInfos = NULL;
} }
// cannot set the numOfIteratedElems again since it is set during previous iteration // cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1; pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
......
...@@ -7449,10 +7449,12 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -7449,10 +7449,12 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput); pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput);
pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols); pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols);
pQueryMsg->tagCondLen = htonl(pQueryMsg->tagCondLen); pQueryMsg->tagCondLen = htonl(pQueryMsg->tagCondLen);
pQueryMsg->tsBuf.tsOffset = htonl(pQueryMsg->tsBuf.tsOffset); pQueryMsg->tsBuf.tsOffset = htonl(pQueryMsg->tsBuf.tsOffset);
pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen);
pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks);
pQueryMsg->tsBuf.tsOrder = htonl(pQueryMsg->tsBuf.tsOrder); pQueryMsg->tsBuf.tsOrder = htonl(pQueryMsg->tsBuf.tsOrder);
pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags); pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags);
pQueryMsg->tbnameCondLen = htonl(pQueryMsg->tbnameCondLen); pQueryMsg->tbnameCondLen = htonl(pQueryMsg->tbnameCondLen);
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput); pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
......
...@@ -2460,7 +2460,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist ...@@ -2460,7 +2460,7 @@ int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb)); tsdbUnLockFS(REPO_FS(pQueryHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pQueryHandle, tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pQueryHandle,
pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId); pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册