提交 5e899afb 编写于 作者: H Hongze Cheng

Merge branch 'develop' into hotfix/boundary

......@@ -808,18 +808,19 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
}
}
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
SFillInfo *pFillInfo) {
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
// discard following dataset in the same group and reset the interpolation information
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosResetFillInfo(pFillInfo, revisedSTime);
if (pFillInfo != NULL) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
taosResetFillInfo(pFillInfo, revisedSTime);
}
pLocalReducer->discard = true;
pLocalReducer->discardData->num = 0;
......@@ -915,13 +916,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */
int32_t prevSize = pFinalDataPage->num;
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
assert(overFlow < pRes->numOfRows);
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
assert(overflow < pRes->numOfRows);
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
pRes->numOfRows -= overFlow;
pFinalDataPage->num -= overFlow;
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
......@@ -988,13 +988,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
pRes->numOfRows -= overFlow;
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
pFinalDataPage->num -= overFlow;
pFinalDataPage->num -= overflow;
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
......
......@@ -106,9 +106,28 @@ typedef void *SDataRow;
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema);
int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset);
SDataRow tdDataRowDup(SDataRow row);
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
memcpy(ptr, value, varDataTLen(value));
dataRowLen(row) += varDataTLen(value);
break;
default:
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
break;
}
return 0;
}
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
switch (type) {
......
......@@ -158,32 +158,6 @@ void tdFreeDataRow(SDataRow row) {
if (row) free(row);
}
/**
* Append a column value to the data row
* @param type: column type
* @param bytes: column bytes
* @param offset: offset in the data row tuple, not including the data row header
*/
int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = POINTER_SHIFT(row, dataRowLen(row));
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
memcpy(ptr, value, varDataTLen(value));
dataRowLen(row) += varDataTLen(value);
break;
default:
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
break;
}
return 0;
}
SDataRow tdDataRowDup(SDataRow row) {
SDataRow trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
......
......@@ -140,7 +140,7 @@ int32_t rpcDebugFlag = 135;
int32_t uDebugFlag = 131;
int32_t debugFlag = 131;
int32_t sDebugFlag = 135;
int32_t tsdbDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
......
......@@ -40,6 +40,7 @@ typedef struct {
int num; // number of continuous streams
struct SCqObj *pHead;
void *dbConn;
int master;
pthread_mutex_t mutex;
} SCqContext;
......@@ -58,6 +59,7 @@ typedef struct SCqObj {
int cqDebugFlag = 135;
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
......@@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
pContext->vgId = pCfg->vgId;
pContext->cqWrite = pCfg->cqWrite;
pContext->ahandle = ahandle;
tscEmbedded = 1;
pthread_mutex_init(&pContext->mutex, NULL);
......@@ -84,6 +87,8 @@ void cqClose(void *handle) {
cqStop(pContext);
// free all resources
pthread_mutex_lock(&pContext->mutex);
SCqObj *pObj = pContext->pHead;
while (pObj) {
SCqObj *pTemp = pObj;
......@@ -91,6 +96,8 @@ void cqClose(void *handle) {
free(pTemp);
}
pthread_mutex_unlock(&pContext->mutex);
pthread_mutex_destroy(&pContext->mutex);
cTrace("vgId:%d, CQ is closed", pContext->vgId);
......@@ -100,28 +107,15 @@ void cqClose(void *handle) {
void cqStart(void *handle) {
SCqContext *pContext = handle;
cTrace("vgId:%d, start all CQs", pContext->vgId);
if (pContext->dbConn) return;
if (pContext->dbConn || pContext->master) return;
pthread_mutex_lock(&pContext->mutex);
tscEmbedded = 1;
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
pthread_mutex_unlock(&pContext->mutex);
return;
}
pContext->master = 1;
SCqObj *pObj = pContext->pHead;
while (pObj) {
int64_t lastKey = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
cqCreateStream(pContext, pObj);
pObj = pObj->next;
}
......@@ -131,10 +125,11 @@ void cqStart(void *handle) {
void cqStop(void *handle) {
SCqContext *pContext = handle;
cTrace("vgId:%d, stop all CQs", pContext->vgId);
if (pContext->dbConn == NULL) return;
if (pContext->dbConn == NULL || pContext->master == 0) return;
pthread_mutex_lock(&pContext->mutex);
pContext->master = 0;
SCqObj *pObj = pContext->pHead;
while (pObj) {
if (pObj->pStream) {
......@@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
if (pContext->pHead) pContext->pHead->prev = pObj;
pContext->pHead = pObj;
if (pContext->dbConn) {
int64_t lastKey = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
}
}
cqCreateStream(pContext, pObj);
pthread_mutex_unlock(&pContext->mutex);
......@@ -218,6 +204,26 @@ void cqDrop(void *handle) {
pthread_mutex_lock(&pContext->mutex);
}
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if (pContext->dbConn == NULL) {
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
if (pContext->dbConn == NULL) {
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
}
return;
}
int64_t lastKey = 0;
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
if (pObj->pStream) {
pContext->num++;
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
} else {
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
}
}
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqObj *pObj = (SCqObj *)param;
SCqContext *pContext = pObj->pContext;
......
......@@ -3030,11 +3030,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
// order has change already!
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (!QUERY_IS_ASC_QUERY(pQuery)) {
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
} else {
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
}
// TODO validate the assertion
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
// } else {
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
// }
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
......@@ -3113,7 +3115,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order);
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);
}
}
......@@ -4384,10 +4386,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
TSKEY nextKey = blockInfo.window.skey;
if (!isIntervalQuery(pQuery)) {
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step);
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
......
......@@ -16,7 +16,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx)
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
static int tsdbEstimateTableEncodeSize(STable *pTable);
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable);
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
/**
* Encode a TSDB table object as a binary content
......@@ -411,7 +411,7 @@ int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name),
tableId.tid, tableId.uid);
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
if (tsdbRemoveTableFromMeta(pMeta, pTable, true) < 0) return -1;
return 0;
......@@ -501,7 +501,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
return 0;
}
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx) {
if (pTable->type == TSDB_SUPER_TABLE) {
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
while (tSkipListIterNext(pIter)) {
......@@ -510,7 +510,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
tsdbRemoveTableFromMeta(pMeta, tTable);
tsdbRemoveTableFromMeta(pMeta, tTable, false);
}
tSkipListDestroyIter(pIter);
......@@ -526,7 +526,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
}
} else {
pMeta->tables[pTable->tableId.tid] = NULL;
if (pTable->type == TSDB_CHILD_TABLE) {
if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) {
tsdbRemoveTableFromIndex(pMeta, pTable);
}
......
......@@ -545,9 +545,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL) {
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
......@@ -561,9 +562,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
}
}
if (pCheckInfo->iiter != NULL) {
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
......@@ -583,6 +585,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
} else {
pQueryHandle->realNumOfRows = binfo.rows;
cur->rows = binfo.rows;
cur->win = binfo.window;
cur->mixBlock = false;
cur->blockCompleted = true;
cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1);
}
}
} else { //desc order
......@@ -914,7 +922,10 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter);
}
int32_t start = -1;
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
int32_t remain = end - pos + 1;
......
......@@ -11,6 +11,19 @@
4. pip install src/connector/python/linux/python2 ; pip3 install
src/connector/python/linux/python3
> Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software
> Foundation since January 1, 2020, it is recommended that subsequent test case
> development be guaranteed to run correctly on Python3.
> For Python2, please consider being compatible if appropriate without
> additional burden.
>
> If you use some new Linux distribution like Ubuntu 20.04 which already do not
> include Python2, please do not install Python2-related packages.
>
> <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/> 
### How to run Python test suite
1. cd \<TDengine\>/tests/pytest
......@@ -211,13 +224,6 @@ def checkAffectedRows(self, expectAffectedRows):
...
> Note: Both Python2 and Python3 are currently supported by the Python test
> case. Since Python2 is no longer officially supported by January 1, 2020, it
> is recommended that subsequent test case development be guaranteed to run
> correctly on Python3. For Python2, please consider being compatible if
> appropriate without additional
> burden. <https://nakedsecurity.sophos.com/2020/01/03/python-is-dead-long-live-python/> 
### CI submission adoption principle.
- Every commit / PR compilation must pass. Currently, the warning is treated
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册