提交 67d45c6b 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

...@@ -331,9 +331,11 @@ endif(${BUILD_WITH_TRAFT}) ...@@ -331,9 +331,11 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV # LIBUV
if(${BUILD_WITH_UV}) if(${BUILD_WITH_UV})
if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows") if (TD_WINDOWS)
MESSAGE("Windows need set no-sign-compare") # There is no GetHostNameW function on win7.
add_compile_options(-Wno-sign-compare) file(READ "libuv/src/win/util.c" LIBUV_WIN_UTIL_CONTENT)
string(REPLACE "if (GetHostNameW(buf, UV_MAXHOSTNAMESIZE" "DWORD nSize = UV_MAXHOSTNAMESIZE;\n if (GetComputerNameW(buf, &nSize" LIBUV_WIN_UTIL_CONTENT "${LIBUV_WIN_UTIL_CONTENT}")
file(WRITE "libuv/src/win/util.c" "${LIBUV_WIN_UTIL_CONTENT}")
endif () endif ()
add_subdirectory(libuv EXCLUDE_FROM_ALL) add_subdirectory(libuv EXCLUDE_FROM_ALL)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
......
...@@ -924,9 +924,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -924,9 +924,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild); SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo); SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......
...@@ -429,7 +429,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray ...@@ -429,7 +429,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, uint64_t suid, SArray
for (int32_t i = 0; i < rows; i++) { for (int32_t i = 0; i < rows; i++) {
int64_t* uid = taosArrayGet(uidList, i); int64_t* uid = taosArrayGet(uidList, i);
void* tag = taosHashGet(tags, uid, sizeof(int64_t)); void* tag = taosHashGet(tags, uid, sizeof(int64_t));
ASSERT(tag); if (suid != 0) {
ASSERT(tag);
}
for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){ for(int32_t j = 0; j < taosArrayGetSize(pResBlock->pDataBlock); j++){
SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j); SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, j);
......
...@@ -871,7 +871,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ ...@@ -871,7 +871,6 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResultRow(SResultRow* result, uint64_t groupId, SHashObj* pUpdatedMap) {
return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap); return saveWinResult(result->win.skey, result->pageId, result->offset, groupId, pUpdatedMap);
;
} }
static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) { static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpdated) {
...@@ -1595,7 +1594,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1595,7 +1594,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
while (1) { while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
...@@ -1886,62 +1885,6 @@ _error: ...@@ -1886,62 +1885,6 @@ _error:
return NULL; return NULL;
} }
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) {
SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pOperator->pTaskInfo = pTaskInfo;
pInfo->inputOrder = TSDB_ORDER_ASC;
pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_STREAM;
pInfo->win = pTaskInfo->window;
pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, numOfRows);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "StreamTimeIntervalAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
destroyIntervalOperatorInfo(pInfo, numOfCols);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}
// todo handle multiple timeline cases. assume no timeline interweaving // todo handle multiple timeline cases. assume no timeline interweaving
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -3109,11 +3052,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { ...@@ -3109,11 +3052,12 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info; SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); SOperatorInfo* downstream = pOperator->pDownstream[0];
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
TSKEY maxTs = INT64_MIN; SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
TSKEY maxTs = INT64_MIN;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
......
...@@ -2822,6 +2822,29 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { ...@@ -2822,6 +2822,29 @@ static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t checkEvery(STranslateContext* pCxt, SValueNode* pInterval) {
int32_t len = strlen(pInterval->literal);
char *unit = &pInterval->literal[len - 1];
if (*unit == 'n' || *unit == 'y') {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE,
"Unsupported time unit in EVERY clause");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateInterpEvery(STranslateContext* pCxt, SNode** pEvery) {
int32_t code = TSDB_CODE_SUCCESS;
code = checkEvery(pCxt, (SValueNode *)(*pEvery));
if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, pEvery);
}
return code;
}
static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -2856,7 +2879,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -2856,7 +2879,7 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = translateExpr(pCxt, &pSelect->pRange); int32_t code = translateExpr(pCxt, &pSelect->pRange);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pSelect->pEvery); code = translateInterpEvery(pCxt, &pSelect->pEvery);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateInterpFill(pCxt, pSelect); code = translateInterpFill(pCxt, pSelect);
......
...@@ -199,10 +199,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) ...@@ -199,10 +199,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
if (pPageH) { if (pPageH) {
// copy the page content // copy the page content
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid)); memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
for (int nLoops = 0;;) {
if (pPageH->pPager) break;
if (++nLoops > 1000) {
sched_yield();
nLoops = 0;
}
}
pPage->pLruNext = NULL; pPage->pLruNext = NULL;
pPage->pPager = pPageH->pPager; pPage->pPager = pPageH->pPager;
memcpy(pPage->pData, pPageH->pData, pPage->pageSize); memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
tdbDebug("pcache/pPageH: %p %d %p %p", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage);
tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize); tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize);
pPage->kLen = pPageH->kLen; pPage->kLen = pPageH->kLen;
pPage->vLen = pPageH->vLen; pPage->vLen = pPageH->vLen;
......
...@@ -33,6 +33,8 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd); ...@@ -33,6 +33,8 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd);
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url); int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url);
int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype); int32_t cfgSetItem(SConfig *pConfig, const char *name, const char *value, ECfgSrcType stype);
extern char **environ;
SConfig *cfgInit() { SConfig *cfgInit() {
SConfig *pCfg = taosMemoryCalloc(1, sizeof(SConfig)); SConfig *pCfg = taosMemoryCalloc(1, sizeof(SConfig));
if (pCfg == NULL) { if (pCfg == NULL) {
...@@ -627,24 +629,17 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { ...@@ -627,24 +629,17 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
} }
int32_t cfgLoadFromEnvVar(SConfig *pConfig) { int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
char *line = NULL, *name, *value, *value2, *value3; char line[1024], *name, *value, *value2, *value3;
int32_t olen, vlen, vlen2, vlen3; int32_t olen, vlen, vlen2, vlen3;
int32_t code = 0; int32_t code = 0;
ssize_t _bytes = 0; char **pEnv = environ;
TdCmdPtr pCmd = taosOpenCmd("set"); line[1023] = 0;
if (pCmd == NULL) { while(*pEnv != NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
while (!taosEOFCmd(pCmd)) {
name = value = value2 = value3 = NULL; name = value = value2 = value3 = NULL;
olen = vlen = vlen2 = vlen3 = 0; olen = vlen = vlen2 = vlen3 = 0;
_bytes = taosGetLineCmd(pCmd, &line); strncpy(line, *pEnv, sizeof(line)-1);
if (_bytes < 0) { pEnv++;
break;
}
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
taosEnvToCfg(line, line); taosEnvToCfg(line, line);
paGetToken(line, &name, &olen); paGetToken(line, &name, &olen);
...@@ -671,9 +666,6 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) { ...@@ -671,9 +666,6 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
} }
} }
taosCloseCmd(&pCmd);
if (line != NULL) taosMemoryFreeClear(line);
uInfo("load from env variables cfg success"); uInfo("load from env variables cfg success");
return 0; return 0;
} }
...@@ -1040,34 +1032,25 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -1040,34 +1032,25 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
index++; index++;
} }
char *line = NULL; char line[1024];
ssize_t _bytes = 0; char **pEnv = environ;
TdCmdPtr pCmd = taosOpenCmd("set"); line[1023] = 0;
if (pCmd != NULL) { while(*pEnv != NULL) {
while (!taosEOFCmd(pCmd)) { strncpy(line, *pEnv, sizeof(line)-1);
_bytes = taosGetLineCmd(pCmd, &line); pEnv++;
if (_bytes < 0) { if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) {
break; char *p = strchr(line, '=');
} if (p != NULL) {
if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0; p++;
if (strncmp(line, "TAOS_APOLLO_URL", 14) == 0) { if (*p == '\'') {
char *p = strchr(line, '=');
if (p != NULL) {
p++; p++;
if (*p == '\'') { p[strlen(p)-1] = '\0';
p++;
p[strlen(p)-1] = '\0';
}
memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX));
uInfo("get apollo url from env variables success, apolloUrl=%s",apolloUrl);
taosCloseCmd(&pCmd);
if (line != NULL) taosMemoryFreeClear(line);
return 0;
} }
memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX));
uInfo("get apollo url from env variables success, apolloUrl=%s",apolloUrl);
return 0;
} }
} }
taosCloseCmd(&pCmd);
if (line != NULL) taosMemoryFreeClear(line);
} }
const char *filepath = ".env"; const char *filepath = ".env";
...@@ -1083,10 +1066,11 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -1083,10 +1066,11 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
return 0; return 0;
} }
} }
int64_t _bytes;
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
if (pFile != NULL) { if (pFile != NULL) {
while (!taosEOFFile(pFile)) { while (!taosEOFFile(pFile)) {
_bytes = taosGetLineFile(pFile, &line); _bytes = taosGetsFile(pFile, sizeof(line) - 1, line);
if (_bytes <= 0) { if (_bytes <= 0) {
break; break;
} }
...@@ -1101,14 +1085,12 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl ...@@ -1101,14 +1085,12 @@ int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char* apolloUrl
} }
memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX)); memcpy(apolloUrl, p, TMIN(strlen(p)+1,PATH_MAX));
taosCloseFile(&pFile); taosCloseFile(&pFile);
if (line != NULL) taosMemoryFreeClear(line);
uInfo("get apollo url from env file success"); uInfo("get apollo url from env file success");
return 0; return 0;
} }
} }
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
if (line != NULL) taosMemoryFreeClear(line);
} }
uInfo("fail get apollo url from cmd env file"); uInfo("fail get apollo url from cmd env file");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册