未验证 提交 4ba0e990 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #8486 from taosdata/feature/d6

merge master branch int develop
...@@ -201,8 +201,8 @@ pipeline { ...@@ -201,8 +201,8 @@ pipeline {
stage('pre_build'){ stage('pre_build'){
agent{label 'master'} agent{label 'master'}
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
when{ when {
changeRequest() changeRequest()
} }
steps { steps {
script{ script{
...@@ -322,21 +322,9 @@ pipeline { ...@@ -322,21 +322,9 @@ pipeline {
''' '''
sh ''' sh '''
cd ${WKC}/src/connector/node-rest/ cd ${WKC}/tests/examples/C#/taosdemo
npm install mcs -out:taosdemo *.cs > /dev/null 2>&1
npm run build echo '' |./taosdemo -c /etc/taos
npm run build:test
npm run test
'''
sh '''
cd ${WKC}/tests/examples/C#/taosdemo
mcs -out:taosdemo *.cs > /dev/null 2>&1
echo '' |./taosdemo -c /etc/taos
cd ${WKC}/tests/connectorTest/C#Test/nanosupport
mcs -out:nano *.cs > /dev/null 2>&1
echo '' |./nano
''' '''
sh ''' sh '''
cd ${WKC}/tests/gotest cd ${WKC}/tests/gotest
......
...@@ -213,7 +213,7 @@ else ...@@ -213,7 +213,7 @@ else
exit 1 exit 1
fi fi
make make -j8
cd ${curr_dir} cd ${curr_dir}
......
...@@ -155,6 +155,7 @@ bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo); ...@@ -155,6 +155,7 @@ bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscGroupbyTag(SQueryInfo* pQueryInfo);
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo); int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo); bool hasTagValOutput(SQueryInfo* pQueryInfo);
......
...@@ -762,35 +762,32 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) { ...@@ -762,35 +762,32 @@ void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
char *pBlockData = pBlocks->data; char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
dataBuf->ordered = true;
int32_t i = 0; if(tsClientMerge) {
int32_t j = 1; int32_t i = 0;
int32_t j = 1;
while (j < pBlocks->numOfRows) { while (j < pBlocks->numOfRows) {
TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i); TSKEY ti = *(TSKEY *)(pBlockData + dataBuf->rowSize * i);
TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j); TSKEY tj = *(TSKEY *)(pBlockData + dataBuf->rowSize * j);
if (ti == tj) { if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize); memmove(pBlockData + dataBuf->rowSize * i, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
continue;
} }
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j; ++j;
continue; }
} pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlockData + dataBuf->rowSize * nextPos, pBlockData + dataBuf->rowSize * j, dataBuf->rowSize);
}
++j;
} }
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
} }
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
...@@ -836,32 +833,33 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk ...@@ -836,32 +833,33 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar); qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataCompar);
dataBuf->ordered = true;
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; if(tsClientMerge) {
int32_t i = 0; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t j = 1; int32_t i = 0;
while (j < nRows) { int32_t j = 1;
TSKEY ti = (pBlkKeyTuple + i)->skey; while (j < nRows) {
TSKEY tj = (pBlkKeyTuple + j)->skey; TSKEY ti = (pBlkKeyTuple + i)->skey;
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) { if (ti == tj) {
memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); if (dataBuf->pTableMeta && dataBuf->pTableMeta->tableInfo.update != TD_ROW_DISCARD_UPDATE) {
memmove(pBlkKeyTuple + i, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
continue;
} }
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j; ++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
} }
++j; pBlocks->numOfRows = i + 1;
} }
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
} }
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
......
...@@ -5022,6 +5022,7 @@ static int32_t validateJoinExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr ...@@ -5022,6 +5022,7 @@ static int32_t validateJoinExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr
const char* msg1 = "super table join requires tags column"; const char* msg1 = "super table join requires tags column";
const char* msg2 = "timestamp join condition missing"; const char* msg2 = "timestamp join condition missing";
const char* msg3 = "condition missing for join query"; const char* msg3 = "condition missing for join query";
const char* msg4 = "only ts column join allowed";
if (!QUERY_IS_JOIN_QUERY(pQueryInfo->type)) { if (!QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
if (pQueryInfo->numOfTables == 1) { if (pQueryInfo->numOfTables == 1) {
...@@ -5039,6 +5040,8 @@ static int32_t validateJoinExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr ...@@ -5039,6 +5040,8 @@ static int32_t validateJoinExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr
if (pCondExpr->pJoinExpr == NULL) { if (pCondExpr->pJoinExpr == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
} else if ((!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) && pCondExpr->pJoinExpr) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
} }
if (!pCondExpr->tsJoin) { if (!pCondExpr->tsJoin) {
...@@ -5593,10 +5596,14 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo ...@@ -5593,10 +5596,14 @@ int32_t validateFillNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNo
const char* msg4 = "illegal value or data overflow"; const char* msg4 = "illegal value or data overflow";
const char* msg5 = "fill only available for interval query"; const char* msg5 = "fill only available for interval query";
const char* msg6 = "not supported function now"; const char* msg6 = "not supported function now";
const char* msg7 = "join query not supported fill operation";
if ((!isTimeWindowQuery(pQueryInfo)) && (!tscIsPointInterpQuery(pQueryInfo))) { if ((!isTimeWindowQuery(pQueryInfo)) && (!tscIsPointInterpQuery(pQueryInfo))) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
if(QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
/* /*
* fill options are set at the end position, when all columns are set properly * fill options are set at the end position, when all columns are set properly
...@@ -5951,6 +5958,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq ...@@ -5951,6 +5958,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return invalidOperationMsg(pMsgBuf, msg11); return invalidOperationMsg(pMsgBuf, msg11);
} }
if (udf) {
return invalidOperationMsg(pMsgBuf, msg11);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder; pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
...@@ -9212,6 +9223,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -9212,6 +9223,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo); pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo); pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryInfo->groupbyTag = tscGroupbyTag(pQueryInfo);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
......
...@@ -752,7 +752,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, ...@@ -752,7 +752,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *,
void taos_close_stream(TAOS_STREAM *handle) { void taos_close_stream(TAOS_STREAM *handle) {
SSqlStream *pStream = (SSqlStream *)handle; SSqlStream *pStream = (SSqlStream *)handle;
SSqlObj *pSql = (SSqlObj *)atomic_exchange_ptr(&pStream->pSql, 0); SSqlObj *pSql = pStream->pSql;
if (pSql == NULL) { if (pSql == NULL) {
return; return;
} }
...@@ -763,13 +763,13 @@ void taos_close_stream(TAOS_STREAM *handle) { ...@@ -763,13 +763,13 @@ void taos_close_stream(TAOS_STREAM *handle) {
*/ */
if (pSql->signature == pSql) { if (pSql->signature == pSql) {
tscRemoveFromStreamList(pStream, pSql); tscRemoveFromStreamList(pStream, pSql);
pStream->pSql = NULL;
taosTmrStopA(&(pStream->pTimer)); taosTmrStopA(&(pStream->pTimer));
tscDebug("0x%"PRIx64" stream:%p is closed", pSql->self, pStream); tscDebug("0x%"PRIx64" stream:%p is closed", pSql->self, pStream);
// notify CQ to release the pStream object // notify CQ to release the pStream object
pStream->fp(pStream->param, NULL, NULL); pStream->fp(pStream->param, NULL, NULL);
pStream->pSql = NULL;
taos_free_result(pSql); taos_free_result(pSql);
tfree(pStream); tfree(pStream);
......
...@@ -2876,7 +2876,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2876,7 +2876,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
SSqlObj *userSql = pParentSql->rootObj; SSqlObj *userSql = pParentSql->rootObj;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) { if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) { if (userSql != pParentSql && pParentSql->freeParam != NULL) {
(*pParentSql->freeParam)(&pParentSql->param); (*pParentSql->freeParam)(&pParentSql->param);
} }
...@@ -3729,6 +3729,25 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -3729,6 +3729,25 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData; return hasData;
} }
void tscSetQuerySort(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr) {
if (pQueryInfo->interval.interval <= 0) {
return;
}
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < size; ++i) {
SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, i);
if (pq->groupbyTag && pq->interval.interval > 0) {
pQueryAttr->needSort = true;
return;
}
}
}
}
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator, void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
char* sql, void* merger, int32_t stage, uint64_t qId) { char* sql, void* merger, int32_t stage, uint64_t qId) {
assert(pQueryInfo != NULL); assert(pQueryInfo != NULL);
...@@ -3831,6 +3850,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr ...@@ -3831,6 +3850,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
SArray* pa = NULL; SArray* pa = NULL;
if (stage == MASTER_SCAN) { if (stage == MASTER_SCAN) {
pQueryAttr->createFilterOperator = false; // no need for parent query pQueryAttr->createFilterOperator = false; // no need for parent query
tscSetQuerySort(pQueryInfo, pQueryAttr);
pa = createExecOperatorPlan(pQueryAttr); pa = createExecOperatorPlan(pQueryAttr);
} else { } else {
pa = createGlobalMergePlan(pQueryAttr); pa = createGlobalMergePlan(pQueryAttr);
......
...@@ -414,6 +414,19 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo) { ...@@ -414,6 +414,19 @@ bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscGroupbyTag(SQueryInfo* pQueryInfo) {
SGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pIndex->flag)) { // group by tag
return true;
}
}
return false;
}
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo) { int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo); size_t numOfExprs = tscNumOfExprs(pQueryInfo);
...@@ -1256,6 +1269,7 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t ...@@ -1256,6 +1269,7 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t
} }
*/ */
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) { void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) {
SSqlRes* pOutput = &pSql->res; SSqlRes* pOutput = &pSql->res;
......
...@@ -339,7 +339,9 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } ...@@ -339,7 +339,9 @@ static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
int tdAllocMemForCol(SDataCol *pCol, int maxPoints); int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints);
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, int rowOffset);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -670,7 +672,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) { ...@@ -670,7 +672,7 @@ static FORCE_INLINE char *memRowEnd(SMemRow row) {
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row); SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull); void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset);
// NOTE: offset here including the header size // NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) { static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int16_t colId, int8_t colType, uint16_t offset) {
......
...@@ -217,6 +217,8 @@ extern int32_t wDebugFlag; ...@@ -217,6 +217,8 @@ extern int32_t wDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int8_t tsClientMerge;
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy // lossy
extern char lossyColumns[]; extern char lossyColumns[];
...@@ -232,6 +234,7 @@ extern int8_t tsDeadLockKillQuery; ...@@ -232,6 +234,7 @@ extern int8_t tsDeadLockKillQuery;
// schemaless // schemaless
extern char tsDefaultJSONStrType[]; extern char tsDefaultJSONStrType[];
typedef struct { typedef struct {
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
int level; int level;
......
...@@ -239,9 +239,12 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { ...@@ -239,9 +239,12 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0; pDataCol->len = 0;
} }
// value from timestamp should be TKEY here instead of TSKEY /**
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { * value from timestamp should be TKEY here instead of TSKEY.
ASSERT(pCol != NULL && value != NULL); * - rowOffset: 0 for current row, -1 for previous row
*/
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints, int rowOffset) {
ASSERT(pCol != NULL && value != NULL && (rowOffset == 0 || rowOffset == -1));
if (isAllRowsNull(pCol)) { if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) { if (isNull(value, pCol->type)) {
...@@ -257,16 +260,29 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo ...@@ -257,16 +260,29 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
} }
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset if (rowOffset == 0) {
pCol->dataOff[numOfRows] = pCol->len; // set offset
// Copy data pCol->dataOff[numOfRows] = pCol->len;
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); // Copy data
// Update the length memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
pCol->len += varDataTLen(value); // Update the length
pCol->len += varDataTLen(value);
} else {
// Copy data
void *lastValue = POINTER_SHIFT(pCol->pData, pCol->dataOff[numOfRows]);
int lastValLen = varDataTLen(lastValue);
memcpy(lastValue, value, varDataTLen(value));
// Update the length
pCol->len -= lastValLen;
pCol->len += varDataTLen(value);
}
} else { } else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); // update the value of last row with increasing the pCol->len and keeping the numOfRows for partial update
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); ASSERT(pCol->len == (TYPE_BYTES[pCol->type] * (numOfRows - rowOffset)));
pCol->len += pCol->bytes; memcpy(POINTER_SHIFT(pCol->pData, (pCol->len + rowOffset * TYPE_BYTES[pCol->type])), value, pCol->bytes);
if (rowOffset == 0) {
pCol->len += pCol->bytes;
}
} }
return 0; return 0;
} }
...@@ -441,7 +457,8 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -441,7 +457,8 @@ void tdResetDataCols(SDataCols *pCols) {
} }
} }
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull,
int rowOffset) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0; int rcol = 0;
...@@ -451,7 +468,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -451,7 +468,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
bool setCol = 0; bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
dcol++; dcol++;
continue; continue;
} }
...@@ -460,14 +477,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -460,14 +477,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
if (pRowCol->colId == pDataCol->colId) { if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1; if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
dcol++; dcol++;
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else { } else {
if(forceSetNull || setCol) { if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
} }
dcol++; dcol++;
} }
...@@ -475,7 +492,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols ...@@ -475,7 +492,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++; pCols->numOfRows++;
} }
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0; int rcol = 0;
...@@ -487,7 +504,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -487,7 +504,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
bool setCol = 0; bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
++dcol; ++dcol;
continue; continue;
} }
...@@ -497,14 +514,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -497,14 +514,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) { if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset); void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1; if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
++dcol; ++dcol;
++rcol; ++rcol;
} else if (colIdx->colId < pDataCol->colId) { } else if (colIdx->colId < pDataCol->colId) {
++rcol; ++rcol;
} else { } else {
if(forceSetNull || setCol) { if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
} }
++dcol; ++dcol;
} }
...@@ -512,11 +529,11 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo ...@@ -512,11 +529,11 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
pCols->numOfRows++; pCols->numOfRows++;
} }
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull, int rowOffset) {
if (isDataRow(row)) { if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull); tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull, rowOffset);
} else if (isKvRow(row)) { } else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull); tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull, rowOffset);
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -539,7 +556,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * ...@@ -539,7 +556,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) { if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints); target->maxPoints, 0);
} }
} }
target->numOfRows++; target->numOfRows++;
...@@ -583,7 +600,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i ...@@ -583,7 +600,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0 || target->cols[i].len > 0) { if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints); target->maxPoints, 0);
} }
} }
...@@ -595,10 +612,10 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i ...@@ -595,10 +612,10 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) { if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints); target->maxPoints, 0);
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints); target->maxPoints, 0);
} else if(target->cols[i].len > 0) { } else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows); dataColSetNullAt(&target->cols[i], target->numOfRows);
} }
......
...@@ -268,6 +268,8 @@ int32_t tsdbDebugFlag = 131; ...@@ -268,6 +268,8 @@ int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int8_t tsClientMerge = 0;
#ifdef TD_TSZ #ifdef TD_TSZ
// //
// lossy compress 6 // lossy compress 6
...@@ -1642,6 +1644,16 @@ static void doInitGlobalConfig(void) { ...@@ -1642,6 +1644,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "clientMerge";
cfg.ptr = &tsClientMerge;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 1;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// default JSON string type option "binary"/"nchar" // default JSON string type option "binary"/"nchar"
cfg.option = "defaultJSONStrType"; cfg.option = "defaultJSONStrType";
cfg.ptr = tsDefaultJSONStrType; cfg.ptr = tsDefaultJSONStrType;
...@@ -1715,9 +1727,9 @@ static void doInitGlobalConfig(void) { ...@@ -1715,9 +1727,9 @@ static void doInitGlobalConfig(void) {
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM); assert(tsGlobalConfigNum < TSDB_CFG_MAX_NUM);
#else #else
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5); assert(tsGlobalConfigNum < TSDB_CFG_MAX_NUM);
#endif #endif
} }
......
...@@ -121,7 +121,7 @@ public class Utils { ...@@ -121,7 +121,7 @@ public class Utils {
} }
private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) { private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) {
Matcher matcher = Pattern.compile("(values|,)\\s*(\\([^)]*\\))").matcher(preparedSql); Matcher matcher = Pattern.compile("(values||,)\\s*(\\([^)]*\\))").matcher(preparedSql);
while (matcher.find()) { while (matcher.find()) {
int start = matcher.start(2); int start = matcher.start(2);
int end = matcher.end(2); int end = matcher.end(2);
......
...@@ -518,7 +518,7 @@ public class SQLTest { ...@@ -518,7 +518,7 @@ public class SQLTest {
@Test @Test
public void testCase050() { public void testCase050() {
String sql = "select * from restful_test.t1, restful_test.t3 where t1.ts = t3.ts and t1.location = t3.location"; String sql = "select * from restful_test.t1, restful_test.t3 where t1.ts = t3.ts";
// when // when
ResultSet rs = executeQuery(connection, sql); ResultSet rs = executeQuery(connection, sql);
// then // then
......
...@@ -73,6 +73,48 @@ public class UtilsTest { ...@@ -73,6 +73,48 @@ public class UtilsTest {
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void multiValuesAndWhitespace() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?) (?,?,?,?) (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4) (200,3.1415,'xyz',5) (300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesNoSeparator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?)(?,?,?,?)(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4)(200,3.1415,'xyz',5)(300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesMultiSeparator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?) (?,?,?,?), (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5, 300, 3.141592, "uvw", 6).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4) (200,3.1415,'xyz',5), (300,3.141592,'uvw',6)";
Assert.assertEquals(expected, actual);
}
@Test @Test
public void lineTerminator() { public void lineTerminator() {
// given // given
...@@ -100,6 +142,32 @@ public class UtilsTest { ...@@ -100,6 +142,32 @@ public class UtilsTest {
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void lineTerminatorAndMultiValuesAndNoneOrMoreWhitespace() {
String nativeSql = "INSERT Into ? TAGS(?) VALUES(?,?,\r\n?,?),(?,? ,\r\n?,?) t? tags (?) Values (?,?,?\r\n,?) (?,?,?,?) t? Tags(?) values (?,?,?,?) , (?,?,?,?)";
Object[] parameters = Stream.of("t1", "abc", 100, 1.1, "xxx", "xxx", 200, 2.2, "xxx", "xxx", 2, "bcd", 300, 3.3, "xxx", "xxx", 400, 4.4, "xxx", "xxx", 3, "cde", 500, 5.5, "xxx", "xxx", 600, 6.6, "xxx", "xxx").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT Into t1 TAGS('abc') VALUES(100,1.1,\r\n'xxx','xxx'),(200,2.2 ,\r\n'xxx','xxx') t2 tags ('bcd') Values (300,3.3,'xxx'\r\n,'xxx') (400,4.4,'xxx','xxx') t3 Tags('cde') values (500,5.5,'xxx','xxx') , (600,6.6,'xxx','xxx')";
Assert.assertEquals(expected, actual);
}
@Test
public void multiValuesAndNoneOrMoreWhitespace() {
String nativeSql = "INSERT INTO ? USING traces TAGS (?, ?) VALUES (?, ?, ?, ?, ?, ?, ?) (?, ?, ?, ?, ?, ?, ?)";
Object[] parameters = Stream.of("t1", "t1", "t2", 1632968284000L, 111.111, 119.001, 0.4, 90, 99.1, "WGS84", 1632968285000L, 111.21109999999999, 120.001, 0.5, 91, 99.19999999999999, "WGS84").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO t1 USING traces TAGS ('t1', 't2') VALUES (1632968284000, 111.111, 119.001, 0.4, 90, 99.1, 'WGS84') (1632968285000, 111.21109999999999, 120.001, 0.5, 91, 99.19999999999999, 'WGS84')";
Assert.assertEquals(expected, actual);
}
@Test @Test
public void replaceNothing() { public void replaceNothing() {
// given // given
......
...@@ -26,6 +26,8 @@ ENDIF () ...@@ -26,6 +26,8 @@ ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
ELSEIF (TD_WINDOWS) ELSEIF (TD_WINDOWS)
ADD_DEFINITIONS(-DUNICODE)
ADD_DEFINITIONS(-D_UNICODE)
LIST(APPEND SRC ./src/shellEngine.c) LIST(APPEND SRC ./src/shellEngine.c)
LIST(APPEND SRC ./src/shellMain.c) LIST(APPEND SRC ./src/shellMain.c)
LIST(APPEND SRC ./src/shellWindows.c) LIST(APPEND SRC ./src/shellWindows.c)
......
...@@ -95,6 +95,9 @@ SShellArguments args = { ...@@ -95,6 +95,9 @@ SShellArguments args = {
*/ */
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */ /*setlocale(LC_ALL, "en_US.UTF-8"); */
#ifdef WINDOWS
SetConsoleOutputCP(CP_UTF8);
#endif
if (!checkVersion()) { if (!checkVersion()) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
......
...@@ -272,13 +272,16 @@ int32_t shellReadCommand(TAOS *con, char command[]) { ...@@ -272,13 +272,16 @@ int32_t shellReadCommand(TAOS *con, char command[]) {
cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE); cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE);
// Read input. // Read input.
char c; void *console = GetStdHandle(STD_INPUT_HANDLE);
unsigned long read;
wchar_t c;
char mbStr[16];
while (1) { while (1) {
c = getchar(); int ret = ReadConsole(console, &c, 1, &read, NULL);
int size = WideCharToMultiByte(CP_UTF8, 0, &c, read, mbStr, sizeof(mbStr), NULL, NULL);
mbStr[size] = 0;
switch (c) { switch (c) {
case '\n': case '\n':
case '\r':
if (isReadyGo(&cmd)) { if (isReadyGo(&cmd)) {
sprintf(command, "%s%s", cmd.buffer, cmd.command); sprintf(command, "%s%s", cmd.buffer, cmd.command);
free(cmd.buffer); free(cmd.buffer);
...@@ -291,8 +294,12 @@ int32_t shellReadCommand(TAOS *con, char command[]) { ...@@ -291,8 +294,12 @@ int32_t shellReadCommand(TAOS *con, char command[]) {
updateBuffer(&cmd); updateBuffer(&cmd);
} }
break; break;
case '\r':
break;
default: default:
insertChar(&cmd, c); for (int i = 0; i < size; ++i) {
insertChar(&cmd, mbStr[i]);
}
} }
} }
......
...@@ -3567,8 +3567,10 @@ static int postProceSql(char *host, uint16_t port, ...@@ -3567,8 +3567,10 @@ static int postProceSql(char *host, uint16_t port,
break; break;
received += bytes; received += bytes;
response_buf[RESP_BUF_LEN - 1] = '\0'; verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf);
response_buf[RESP_BUF_LEN - 1] = '\0';
if (strlen(response_buf)) { if (strlen(response_buf)) {
verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n", verbosePrint("%s() LN%d: received:%d resp_len:%d, response_buf:\n%s\n",
__func__, __LINE__, received, resp_len, response_buf); __func__, __LINE__, received, resp_len, response_buf);
...@@ -6611,7 +6613,6 @@ static int getRowDataFromSample( ...@@ -6611,7 +6613,6 @@ static int getRowDataFromSample(
stbInfo->sampleDataBuf stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos)); + stbInfo->lenOfOneRow * (*sampleUsePos));
} }
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++; (*sampleUsePos)++;
...@@ -11211,7 +11212,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -11211,7 +11212,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
} }
*/ */
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) { if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
#ifdef WINDOWS #ifdef WINDOWS
WSADATA wsaData; WSADATA wsaData;
...@@ -11237,7 +11237,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -11237,7 +11237,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->sockfd = sockfd; pThreadInfo->sockfd = sockfd;
} }
tsem_init(&(pThreadInfo->lock_sem), 0, 0); tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) { if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
......
...@@ -229,6 +229,7 @@ typedef struct SQueryAttr { ...@@ -229,6 +229,7 @@ typedef struct SQueryAttr {
bool stateWindow; // window State on sub/normal table bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock bool multigroupResult; // multigroup result can exist in one SSDataBlock
bool needSort; // need sort rowRes
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
......
...@@ -152,7 +152,8 @@ typedef struct SQueryInfo { ...@@ -152,7 +152,8 @@ typedef struct SQueryInfo {
struct SQueryInfo *pDownstream; struct SQueryInfo *pDownstream;
int32_t havingFieldNum; int32_t havingFieldNum;
bool stableQuery; bool stableQuery;
bool groupbyColumn; bool groupbyColumn;
bool groupbyTag;
bool simpleAgg; bool simpleAgg;
bool arithmeticOnAgg; bool arithmeticOnAgg;
bool projectionQuery; bool projectionQuery;
......
...@@ -1313,9 +1313,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx ...@@ -1313,9 +1313,6 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
if (pCtx[k].currentStage == MERGE_STAGE) { if (pCtx[k].currentStage == MERGE_STAGE) {
pCtx[k].order = TSDB_ORDER_ASC; pCtx[k].order = TSDB_ORDER_ASC;
} }
pCtx[k].startTs = pQueryAttr->window.skey;
if (pCtx[k].functionId < 0) { if (pCtx[k].functionId < 0) {
// load the script and exec // load the script and exec
SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo; SUdfInfo* pUdfInfo = pRuntimeEnv->pUdfInfo;
...@@ -5991,6 +5988,18 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { ...@@ -5991,6 +5988,18 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
return NULL; return NULL;
} }
static int32_t resRowCompare(const void *r1, const void *r2) {
SResultRow *res1 = *(SResultRow **)r1;
SResultRow *res2 = *(SResultRow **)r2;
if (res1->win.skey == res2->win.skey) {
return 0;
} else {
return res1->win.skey > res2->win.skey ? 1 : -1;
}
}
static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -6036,6 +6045,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -6036,6 +6045,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
pQueryAttr->window = win; pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
if (pIntervalInfo->resultRowInfo.size > 0 && pQueryAttr->needSort) {
qsort(pIntervalInfo->resultRowInfo.pResult, pIntervalInfo->resultRowInfo.size, POINTER_BYTES, resRowCompare);
}
closeAllResultRows(&pIntervalInfo->resultRowInfo); closeAllResultRows(&pIntervalInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
......
...@@ -41,6 +41,7 @@ typedef struct STable { ...@@ -41,6 +41,7 @@ typedef struct STable {
int16_t restoreColumnNum; int16_t restoreColumnNum;
bool hasRestoreLastColumn; bool hasRestoreLastColumn;
int lastColSVersion; int lastColSVersion;
int16_t cacheLastConfigVersion;
T_REF_DECLARE() T_REF_DECLARE()
} STable; } STable;
......
...@@ -79,8 +79,8 @@ struct STsdbRepo { ...@@ -79,8 +79,8 @@ struct STsdbRepo {
STsdbCfg save_config; // save apply config STsdbCfg save_config; // save apply config
bool config_changed; // config changed flag bool config_changed; // config changed flag
pthread_mutex_t save_mutex; // protect save config pthread_mutex_t save_mutex; // protect save config
uint8_t hasCachedLastColumn; int16_t cacheLastConfigVersion;
STsdbAppH appH; STsdbAppH appH;
STsdbStat stat; STsdbStat stat;
...@@ -111,7 +111,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo); ...@@ -111,7 +111,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbCheckCommit(STsdbRepo* pRepo);
int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo);
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg);
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable);
void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetRootDir(int repoid, char dirName[]);
void tsdbGetDataDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]);
......
...@@ -1468,7 +1468,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1468,7 +1468,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail //TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints); pTarget->maxPoints, 0);
} }
pTarget->numOfRows++; pTarget->numOfRows++;
...@@ -1480,7 +1480,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1480,7 +1480,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, true); tdAppendMemRowToDataCol(row, pSchema, pTarget, true, 0);
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
...@@ -1489,7 +1489,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1489,7 +1489,7 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail //TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints); pTarget->maxPoints, 0);
} }
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
...@@ -1502,7 +1502,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1502,7 +1502,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE,
update != TD_ROW_PARTIAL_UPDATE ? 0 : -1);
} }
(*iter)++; (*iter)++;
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
......
...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { ...@@ -146,7 +146,9 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) {
if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) {
if (tsdbLockRepo(pRepo) < 0) return; if (tsdbLockRepo(pRepo) < 0) return;
tsdbCacheLastData(pRepo, &oldCfg); // tsdbCacheLastData(pRepo, &oldCfg);
// lazy load last cache when query or update
++pRepo->cacheLastConfigVersion;
tsdbUnlockRepo(pRepo); tsdbUnlockRepo(pRepo);
} }
......
...@@ -576,7 +576,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { ...@@ -576,7 +576,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
return NULL; return NULL;
} }
pRepo->config_changed = false; pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastColumn, 0); pRepo->cacheLastConfigVersion = 0;
code = tsem_init(&(pRepo->readyToCommit), 0, 1); code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
...@@ -726,7 +726,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -726,7 +726,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
loadStatisData = true; loadStatisData = true;
} }
} }
TSDB_WLOCK_TABLE(pTable); // lock when update pTable->lastCols[]
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) { for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i); STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns // ignore loaded columns
...@@ -775,6 +775,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -775,6 +775,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
break; break;
} }
} }
TSDB_WUNLOCK_TABLE(pTable);
} }
out: out:
...@@ -803,20 +804,33 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, ...@@ -803,20 +804,33 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
// Get the data in row // Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable); STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); SMemRow lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) { if (lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
memRowSetType(pTable->lastRow, SMEM_ROW_DATA); memRowSetType(lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema); tdInitDataRow(memRowDataBody(lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) { for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol); STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, tdAppendColVal(memRowDataBody(lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset); pCol->offset);
} }
TSKEY lastKey = memRowKey(lastRow);
// during the load data in file, new data would be inserted and last row has been updated
TSDB_WLOCK_TABLE(pTable);
if (pTable->lastRow == NULL) {
pTable->lastKey = lastKey;
pTable->lastRow = lastRow;
TSDB_WUNLOCK_TABLE(pTable);
} else {
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(lastRow);
}
return 0; return 0;
} }
...@@ -887,14 +901,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { ...@@ -887,14 +901,105 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (CACHE_LAST_NULL_COLUMN(pCfg)) { // if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
// }
return 0;
}
int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config));
bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config));
tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol);
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
if (!cacheLastRow && pTable->lastRow != NULL) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
}
if (!cacheLastCol && pTable->lastCols != NULL) {
tsdbFreeLastColumns(pTable);
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0;
cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0;
if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) {
return 0;
}
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
} }
tsdbRLockFS(REPO_FS(pRepo));
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((cacheLastRowTableNum > 0 || cacheLastColTableNum > 0) && (pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
// tsdbDebug("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && (cacheLastRowTableNum > 0) && (pTable->lastRow == NULL)) {
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && (cacheLastColTableNum > 0) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
tsdbUnLockFS(REPO_FS(pRepo));
tsdbDestroyReadH(&readh);
return 0; return 0;
} }
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false; bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter; SFSIter fsiter;
SReadH readh; SReadH readh;
...@@ -928,9 +1033,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -928,9 +1033,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
// if close last option,need to free data // if close last option,need to free data
if (need_free_last_row || need_free_last_col) { if (need_free_last_row || need_free_last_col) {
if (need_free_last_col) { // if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0); // atomic_store_8(&pRepo->hasCachedLastColumn, 0);
} // }
tsdbInfo("free cache last data since cacheLast option changed"); tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i <= maxTableIdx; i++) { for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i]; STable *pTable = pMeta->tables[i];
...@@ -1008,9 +1113,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { ...@@ -1008,9 +1113,9 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
tsdbDestroyReadH(&readh); tsdbDestroyReadH(&readh);
if (cacheLastCol) { // if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1); // atomic_store_8(&pRepo->hasCachedLastColumn, 1);
} // }
return 0; return 0;
} }
...@@ -594,7 +594,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * ...@@ -594,7 +594,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
} }
} }
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true); tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
} }
return 0; return 0;
...@@ -1001,7 +1001,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1001,7 +1001,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
if ((value == NULL) || isNull(value, pTCol->type)) { if ((value == NULL) || isNull(value, pTCol->type)) {
continue; continue;
} }
// lock
TSDB_WLOCK_TABLE(pTable);
SDataCol *pDataCol = &(pLatestCols[idx]); SDataCol *pDataCol = &(pLatestCols[idx]);
if (pDataCol->pData == NULL) { if (pDataCol->pData == NULL) {
pDataCol->pData = malloc(pTCol->bytes); pDataCol->pData = malloc(pTCol->bytes);
...@@ -1017,6 +1018,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro ...@@ -1017,6 +1018,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
memcpy(pDataCol->pData, value, bytes); memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts = memRowKey(row); pDataCol->ts = memRowKey(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
} }
} }
...@@ -1063,5 +1066,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r ...@@ -1063,5 +1066,8 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
updateTableLatestColumn(pRepo, pTable, row); updateTableLatestColumn(pRepo, pTable, row);
} }
} }
pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion;
return 0; return 0;
} }
...@@ -639,27 +639,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) { ...@@ -639,27 +639,30 @@ int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId) {
} }
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) { int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema) {
ASSERT(pTable->lastCols == NULL); TSDB_WLOCK_TABLE(pTable);
if (pTable->lastCols == NULL) {
int16_t numOfColumn = pSchema->numOfCols;
int16_t numOfColumn = pSchema->numOfCols; pTable->lastCols = (SDataCol *)malloc(numOfColumn * sizeof(SDataCol));
if (pTable->lastCols == NULL) {
TSDB_WUNLOCK_TABLE(pTable);
return -1;
}
pTable->lastCols = (SDataCol*)malloc(numOfColumn * sizeof(SDataCol)); for (int16_t i = 0; i < numOfColumn; ++i) {
if (pTable->lastCols == NULL) { STColumn *pCol = schemaColAt(pSchema, i);
return -1; SDataCol *pDataCol = &(pTable->lastCols[i]);
} pDataCol->bytes = 0;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
}
for (int16_t i = 0; i < numOfColumn; ++i) { pTable->lastColSVersion = schemaVersion(pSchema);
STColumn *pCol = schemaColAt(pSchema, i); pTable->maxColNum = numOfColumn;
SDataCol* pDataCol = &(pTable->lastCols[i]); pTable->restoreColumnNum = 0;
pDataCol->bytes = 0; pTable->hasRestoreLastColumn = false;
pDataCol->pData = NULL;
pDataCol->colId = pCol->colId;
} }
TSDB_WUNLOCK_TABLE(pTable);
pTable->lastColSVersion = schemaVersion(pSchema);
pTable->maxColNum = numOfColumn;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
return 0; return 0;
} }
...@@ -809,6 +812,7 @@ static STable *tsdbNewTable() { ...@@ -809,6 +812,7 @@ static STable *tsdbNewTable() {
pTable->lastCols = NULL; pTable->lastCols = NULL;
pTable->restoreColumnNum = 0; pTable->restoreColumnNum = 0;
pTable->cacheLastConfigVersion = 0;
pTable->maxColNum = 0; pTable->maxColNum = 0;
pTable->hasRestoreLastColumn = false; pTable->hasRestoreLastColumn = false;
pTable->lastColSVersion = -1; pTable->lastColSVersion = -1;
......
...@@ -157,6 +157,7 @@ typedef struct STableGroupSupporter { ...@@ -157,6 +157,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
...@@ -591,6 +592,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon ...@@ -591,6 +592,28 @@ void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCon
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next); pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
} }
static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
STsdbRepo* pRepo = pQueryHandle->pTsdb;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
int32_t code = 0;
for (size_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
STable* pTable = pCheckInfo->pTableObj;
if (pTable->cacheLastConfigVersion == pRepo->cacheLastConfigVersion) {
continue;
}
code = tsdbLoadLastCache(pRepo, pTable);
if (code != 0) {
tsdbError("%p uid:%" PRId64 ", tid:%d, failed to load last cache since %s", pQueryHandle, pTable->tableId.uid,
pTable->tableId.tid, tstrerror(terrno));
break;
}
}
return code;
}
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
pCond->twindow = updateLastrowForEachGroup(groupList); pCond->twindow = updateLastrowForEachGroup(groupList);
...@@ -604,6 +627,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -604,6 +627,8 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLastRow(pQueryHandle, groupList); int32_t code = checkForCachedLastRow(pQueryHandle, groupList);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -618,13 +643,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable ...@@ -618,13 +643,14 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
return pQueryHandle; return pQueryHandle;
} }
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) { TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef);
if (pQueryHandle == NULL) { if (pQueryHandle == NULL) {
return NULL; return NULL;
} }
lazyLoadCacheLast(pQueryHandle);
int32_t code = checkForCachedLast(pQueryHandle); int32_t code = checkForCachedLast(pQueryHandle);
if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0 if (code != TSDB_CODE_SUCCESS) { // set the numOfTables to be 0
terrno = code; terrno = code;
...@@ -2758,6 +2784,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2758,6 +2784,9 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
} }
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
// lock pTable->lastCols[i] as it would be released when schema update(tsdbUpdateLastColSchema)
TSDB_RLOCK_TABLE(pTable);
while(i < tgNumOfCols && j < numOfCols) { while(i < tgNumOfCols && j < numOfCols) {
pColInfo = taosArrayGet(pQueryHandle->pColumns, i); pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pTable->lastCols[j].colId < pColInfo->info.colId) { if (pTable->lastCols[j].colId < pColInfo->info.colId) {
...@@ -2844,6 +2873,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -2844,6 +2873,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) {
i++; i++;
j++; j++;
} }
TSDB_RUNLOCK_TABLE(pTable);
// leave the real ts column as the last row, because last function only (not stable) use the last row as res // leave the real ts column as the last row, because last function only (not stable) use the last row as res
if (priKey != TSKEY_INITIAL_VAL) { if (priKey != TSKEY_INITIAL_VAL) {
...@@ -3175,7 +3205,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) { ...@@ -3175,7 +3205,9 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0; int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){ STsdbRepo* pRepo = pQueryHandle->pTsdb;
if (pRepo && CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST; pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 125 #define TSDB_CFG_MAX_NUM 128
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -346,7 +346,7 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const ...@@ -346,7 +346,7 @@ int tsCompressBoolImp(const char *const input, const int nelements, char *const
/* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */ /* t = (~((( uint8_t)1) << (7-i%BITS_PER_BYTE))); */
output[pos] |= t; output[pos] |= t;
} else { } else {
uError("Invalid compress bool value:%d", output[pos]); uError("Invalid compress bool value:%d", input[i]);
return -1; return -1;
} }
} }
......
...@@ -144,7 +144,6 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -144,7 +144,6 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
// backward to put the first data // backward to put the first data
hasDup = tSkipListGetPosToPut(pSkipList, backward, pData); hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
tSkipListPutImpl(pSkipList, pData, backward, false, hasDup); tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
for (int level = 0; level < pSkipList->maxLevel; level++) { for (int level = 0; level < pSkipList->maxLevel; level++) {
...@@ -163,7 +162,12 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -163,7 +162,12 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
for (int i = 0; i < pSkipList->maxLevel; i++) { for (int i = 0; i < pSkipList->maxLevel; i++) {
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i); forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
} }
} else if(compare == 0) {
// same need special deal
forward[0] = SL_NODE_GET_BACKWARD_POINTER(SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail,0),0);
hasDup = true;
} else { } else {
SSkipListNode *p = NULL;
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pHead;
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) { for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
if (i < pSkipList->level) { if (i < pSkipList->level) {
...@@ -175,19 +179,29 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it ...@@ -175,19 +179,29 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
} }
} }
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i); // if px not head , must compare with px
if(px == pSkipList->pHead) {
p = SL_NODE_GET_FORWARD_POINTER(px, i);
} else {
p = px;
}
while (p != pSkipList->pTail) { while (p != pSkipList->pTail) {
pKey = SL_GET_NODE_KEY(pSkipList, p); pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey); compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) { if (compare >= 0) {
if (compare == 0 && !hasDup) hasDup = true; if (compare == 0) {
hasDup = true;
forward[0] = SL_NODE_GET_BACKWARD_POINTER(p, 0);
}
break; break;
} else { } else {
px = p; px = p;
p = SL_NODE_GET_FORWARD_POINTER(px, i); p = SL_NODE_GET_FORWARD_POINTER(px, i);
} }
} }
// if found duplicate, immediately break, needn't continue to loop set rest forward[i] value
if(hasDup) break;
} }
forward[i] = px; forward[i] = px;
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import os
import sys
sys.path.insert(0, os.getcwd())
from util.log import *
from util.sql import *
from util.dnodes import *
import taos
import threading
import subprocess
from random import choice
class TwoClients:
def initConnection(self):
self.host = "chenhaoran01"
self.user = "root"
self.password = "taosdata"
self.config = "/home/chr/cfg/single/"
self.port =6030
self.rowNum = 10
self.ts = 1537146000000
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def run(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
walFilePath = "/var/lib/taos/mnode_bak/wal/"
# new taos client
conn1 = taos.connect(host=self.host, user=self.user, password=self.password, config=self.config )
print(conn1)
cur1 = conn1.cursor()
tdSql.init(cur1, True)
# create backgroud db and tb
tdSql.execute("drop database if exists db1")
os.system("%staosdemo -f compress/insertDataDb1.json -y " % binPath)
# create foreground db and tb
tdSql.execute("drop database if exists foredb")
tdSql.execute("create database foredb")
tdSql.execute("use foredb")
print("123test")
tdSql.execute("create stable if not exists stb (ts timestamp, dataInt int, dataDouble double,dataStr nchar(200)) tags(loc nchar(50),t1 int)")
tdSql.execute("create table tb1 using stb tags('beijing1', 10)")
tdSql.execute("insert into tb1 values(1614218412000,8635,98.861,'qazwsxedcrfvtgbyhnujmikolp1')(1614218422000,8636,98.862,'qazwsxedcrfvtgbyhnujmikolp2')")
tdSql.execute("create table tb2 using stb tags('beijing2', 11)")
tdSql.execute("insert into tb2 values(1614218432000,8647,98.863,'qazwsxedcrfvtgbyhnujmikolp3')")
tdSql.execute("insert into tb2 values(1614218442000,8648,98.864,'qazwsxedcrfvtgbyhnujmikolp4')")
# check data correct
tdSql.execute("use db1")
tdSql.query("select count(tbname) from stb0")
tdSql.checkData(0, 0, 50000)
tdSql.query("select count(*) from stb0")
tdSql.checkData(0, 0, 5000000)
tdSql.execute("use foredb")
tdSql.query("select count (tbname) from stb")
tdSql.checkData(0, 0, 2)
tdSql.query("select count (*) from stb")
tdSql.checkData(0, 0, 4)
tdSql.query("select * from tb1 order by ts")
tdSql.checkData(0, 3, "qazwsxedcrfvtgbyhnujmikolp1")
tdSql.query("select * from tb2 order by ts")
tdSql.checkData(1, 3, "qazwsxedcrfvtgbyhnujmikolp4")
# delete useless file
testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf ./insert_res.txt")
# os.system("rm -rf compress/%s.sql" % testcaseFilename )
clients = TwoClients()
clients.initConnection()
# clients.getBuildPath()
clients.run()
\ No newline at end of file
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"interlace_rows": 10,
"num_of_records_per_req": 1000,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db1",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 50,
"blocks": 8,
"precision": "ms",
"keep": 3650,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb0",
"child_table_exists":"no",
"childtable_count": 50000,
"childtable_prefix": "stb00_",
"auto_create_table": "no",
"batch_create_tbl_num": 100,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 100,
"childtable_limit": 0,
"childtable_offset":0,
"interlace_rows": 0,
"insert_interval":0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"},{"type": "TINYINT"},{"type": "smallint"},{"type": "bool"},{"type": "bigint"},{"type": "float"},{"type": "double"}, {"type": "BINARY","len": 32}, {"type": "nchar","len": 32}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":1}]
}]
}]
}
...@@ -273,6 +273,7 @@ python3 ./test.py -f query/queryStateWindow.py ...@@ -273,6 +273,7 @@ python3 ./test.py -f query/queryStateWindow.py
# python3 ./test.py -f query/nestedQuery/queryWithOrderLimit.py # python3 ./test.py -f query/nestedQuery/queryWithOrderLimit.py
python3 ./test.py -f query/nestquery_last_row.py python3 ./test.py -f query/nestquery_last_row.py
python3 ./test.py -f query/nestedQuery/nestedQuery.py python3 ./test.py -f query/nestedQuery/nestedQuery.py
python3 ./test.py -f query/nestedQuery/nestedQuery_datacheck.py
python3 ./test.py -f query/queryCnameDisplay.py python3 ./test.py -f query/queryCnameDisplay.py
# python3 ./test.py -f query/operator_cost.py # python3 ./test.py -f query/operator_cost.py
# python3 ./test.py -f query/long_where_query.py # python3 ./test.py -f query/long_where_query.py
......
...@@ -61,6 +61,12 @@ class TDTestCase: ...@@ -61,6 +61,12 @@ class TDTestCase:
tdSql.query("select count(*) from stb") tdSql.query("select count(*) from stb")
tdSql.checkData(0, 0, 4096) tdSql.checkData(0, 0, 4096)
sql = "create table stb(ts timestamp, "
for i in range(15):
sql += "col%d binary(1022), " % (i + 1)
sql += "col1023 binary(1015))"
tdSql.error(sql)
endTime = time.time() endTime = time.time()
sql = "create table stb(ts timestamp, " sql = "create table stb(ts timestamp, "
......
...@@ -1714,7 +1714,6 @@ class TDTestCase: ...@@ -1714,7 +1714,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_where) sql += "%s " % random.choice(q_u_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1731,7 +1730,6 @@ class TDTestCase: ...@@ -1731,7 +1730,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_or_where) sql += "%s " % random.choice(q_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1767,7 +1765,6 @@ class TDTestCase: ...@@ -1767,7 +1765,6 @@ class TDTestCase:
sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_where) sql += "%s " % random.choice(q_u_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1784,7 +1781,6 @@ class TDTestCase: ...@@ -1784,7 +1781,6 @@ class TDTestCase:
sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(q_u_or_where) sql += "%s " % random.choice(q_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1818,7 +1814,6 @@ class TDTestCase: ...@@ -1818,7 +1814,6 @@ class TDTestCase:
sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(t_join_where) sql += "%s " % random.choice(t_join_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -1835,7 +1830,6 @@ class TDTestCase: ...@@ -1835,7 +1830,6 @@ class TDTestCase:
sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1, stable_2 t2 where t1.ts = t2.ts and "
sql += "%s " % random.choice(qt_u_or_where) sql += "%s " % random.choice(qt_u_or_where)
sql += "%s " % random.choice(session_u_where) sql += "%s " % random.choice(session_u_where)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2015,7 +2009,6 @@ class TDTestCase: ...@@ -2015,7 +2009,6 @@ class TDTestCase:
sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and "
sql += "%s and " % random.choice(t_join_where) sql += "%s and " % random.choice(t_join_where)
sql += "%s " % random.choice(interp_where_j) sql += "%s " % random.choice(interp_where_j)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2032,7 +2025,6 @@ class TDTestCase: ...@@ -2032,7 +2025,6 @@ class TDTestCase:
sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and " sql += " from stable_1 t1 , stable_2 t2 where t1.ts = t2.ts and "
sql += "%s and " % random.choice(qt_u_or_where) sql += "%s and " % random.choice(qt_u_or_where)
sql += "%s " % random.choice(interp_where_j) sql += "%s " % random.choice(interp_where_j)
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2065,7 +2057,6 @@ class TDTestCase: ...@@ -2065,7 +2057,6 @@ class TDTestCase:
sql += " from table_0 t1, table_1 t2 where t1.ts = t2.ts and " sql += " from table_0 t1, table_1 t2 where t1.ts = t2.ts and "
#sql += "%s and " % random.choice(t_join_where) #sql += "%s and " % random.choice(t_join_where)
sql += "%s " % interp_where_j[random.randint(0,5)] sql += "%s " % interp_where_j[random.randint(0,5)]
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
...@@ -2116,7 +2107,6 @@ class TDTestCase: ...@@ -2116,7 +2107,6 @@ class TDTestCase:
sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and " sql += " from regular_table_1 t1, regular_table_2 t2 where t1.ts = t2.ts and "
#sql += "%s " % random.choice(interp_where_j) #sql += "%s " % random.choice(interp_where_j)
sql += "%s " % interp_where_j[random.randint(0,5)] sql += "%s " % interp_where_j[random.randint(0,5)]
sql += "%s " % random.choice(fill_where)
sql += "%s " % random.choice(order_u_where) sql += "%s " % random.choice(order_u_where)
sql += "%s " % random.choice(limit_u_where) sql += "%s " % random.choice(limit_u_where)
sql += ") " sql += ") "
......
此差异已折叠。
...@@ -103,7 +103,6 @@ class TDTestCase: ...@@ -103,7 +103,6 @@ class TDTestCase:
select count(*) as count, loc from st where ts between 1600000000000 and 1600000000010 group by loc''') select count(*) as count, loc from st where ts between 1600000000000 and 1600000000010 group by loc''')
tdSql.checkRows(6) tdSql.checkRows(6)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
...@@ -63,6 +63,7 @@ class TDTestCase: ...@@ -63,6 +63,7 @@ class TDTestCase:
tdLog.sleep(3) tdLog.sleep(3)
# test case for https://jira.taosdata.com:18080/browse/TS-402 # test case for https://jira.taosdata.com:18080/browse/TS-402
tdLog.info("test case for update option 1")
tdSql.execute("create database test update 1") tdSql.execute("create database test update 1")
tdSql.execute("use test") tdSql.execute("use test")
...@@ -75,7 +76,39 @@ class TDTestCase: ...@@ -75,7 +76,39 @@ class TDTestCase:
tdSql.checkData(0, 2, None) tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, 9) tdSql.checkData(0, 3, 9)
tdSql.execute("drop table if exists tb")
tdSql.execute("create table tb (ts timestamp, c1 int, c2 int, c3 int)")
tdSql.execute("insert into tb values(%d, 1, 2, 3)(%d, null, 4, 5)(%d, 6, null, 7)" % (self.ts, self.ts, self.ts))
tdSql.query("select * from tb")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 6)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, 7)
# https://jira.taosdata.com:18080/browse/TS-424
tdLog.info("test case for update option 2")
tdSql.execute("create database db2 update 2")
tdSql.execute("use db2")
tdSql.execute("create table tb (ts timestamp, c1 int, c2 int, c3 int)")
tdSql.execute("insert into tb values(%d, 1, 2, 3)(%d, null, null, 9)" % (self.ts, self.ts))
tdSql.query("select * from tb")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 2)
tdSql.checkData(0, 3, 9)
tdSql.execute("drop table if exists tb")
tdSql.execute("create table tb (ts timestamp, c1 int, c2 int, c3 int)")
tdSql.execute("insert into tb values(%d, 1, 2, 3)(%d, null, 4, 5)(%d, 6, null, 7)" % (self.ts, self.ts, self.ts))
tdSql.query("select * from tb")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 6)
tdSql.checkData(0, 2, 4)
tdSql.checkData(0, 3, 7)
def stop(self): def stop(self):
......
...@@ -445,7 +445,7 @@ if $rows != $val then ...@@ -445,7 +445,7 @@ if $rows != $val then
endi endi
print ================>TD-5600 print ================>TD-5600
sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s) fill(linear); sql select first(join_tb0.c8),first(join_tb0.c9) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts <= 100002 and join_tb1.ts>=100000 interval(1s);
#=============================================================== #===============================================================
......
...@@ -549,4 +549,381 @@ if $data11 != 2.000000000 then ...@@ -549,4 +549,381 @@ if $data11 != 2.000000000 then
return -1 return -1
endi endi
sql create database test2;
sql use test2;
sql create table meters (ts TIMESTAMP,a INT,b INT) TAGS (area INT);
sql CREATE TABLE t0 USING meters TAGS (0);
sql CREATE TABLE t1 USING meters TAGS (1);
sql CREATE TABLE t2 USING meters TAGS (1);
sql CREATE TABLE t3 USING meters TAGS (0);
sql insert into t0 values ('2021-09-30 15:00:00.00',0,0);
sql insert into t0 values ('2021-09-30 15:00:01.00',1,1);
sql insert into t0 values ('2021-09-30 15:00:03.00',3,3);
sql insert into t0 values ('2021-09-30 15:00:05.00',5,5);
sql insert into t0 values ('2021-09-30 15:00:07.00',7,7);
sql insert into t0 values ('2021-09-30 15:00:09.00',9,9);
sql insert into t1 values ('2021-09-30 15:00:00.00',0,0);
sql insert into t1 values ('2021-09-30 15:00:02.00',2,2);
sql insert into t1 values ('2021-09-30 15:00:04.00',4,4);
sql insert into t1 values ('2021-09-30 15:00:06.00',6,6);
sql insert into t1 values ('2021-09-30 15:00:08.00',8,8);
sql insert into t1 values ('2021-09-30 15:00:10.00',10,10);
sql insert into t2 values ('2021-09-30 15:00:00.00',0,0);
sql insert into t2 values ('2021-09-30 15:00:01.00',11,11);
sql insert into t2 values ('2021-09-30 15:00:02.00',22,22);
sql insert into t2 values ('2021-09-30 15:00:03.00',33,33);
sql insert into t2 values ('2021-09-30 15:00:04.00',44,44);
sql insert into t2 values ('2021-09-30 15:00:05.00',55,55);
sql insert into t3 values ('2021-09-30 15:00:00.00',0,0);
sql insert into t3 values ('2021-09-30 15:00:01.00',11,11);
sql insert into t3 values ('2021-09-30 15:00:02.00',22,22);
sql insert into t3 values ('2021-09-30 15:00:03.00',33,33);
sql insert into t3 values ('2021-09-30 15:00:04.00',44,44);
sql insert into t3 values ('2021-09-30 15:00:05.00',55,55);
sql select count(*) from meters interval(1s) group by tbname;
if $rows != 24 then
return -1
endi
sql select count(*) from (select count(*) from meters interval(1s) group by tbname) interval(1s);
if $rows != 11 then
return -1
endi
if $data00 != @21-09-30 15:00:00.000@ then
return -1
endi
if $data01 != 4 then
return -1
endi
if $data10 != @21-09-30 15:00:01.000@ then
return -1
endi
if $data11 != 3 then
return -1
endi
if $data20 != @21-09-30 15:00:02.000@ then
return -1
endi
if $data21 != 3 then
return -1
endi
if $data30 != @21-09-30 15:00:03.000@ then
return -1
endi
if $data31 != 3 then
return -1
endi
if $data40 != @21-09-30 15:00:04.000@ then
return -1
endi
if $data41 != 3 then
return -1
endi
if $data50 != @21-09-30 15:00:05.000@ then
return -1
endi
if $data51 != 3 then
return -1
endi
if $data60 != @21-09-30 15:00:06.000@ then
return -1
endi
if $data61 != 1 then
return -1
endi
if $data70 != @21-09-30 15:00:07.000@ then
return -1
endi
if $data71 != 1 then
return -1
endi
if $data80 != @21-09-30 15:00:08.000@ then
return -1
endi
if $data81 != 1 then
return -1
endi
if $data90 != @21-09-30 15:00:09.000@ then
return -1
endi
if $data91 != 1 then
return -1
endi
sql select count(*) from (select count(*) from meters interval(1s) group by area) interval(1s);
if $rows != 11 then
return -1
endi
if $data00 != @21-09-30 15:00:00.000@ then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data10 != @21-09-30 15:00:01.000@ then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data20 != @21-09-30 15:00:02.000@ then
return -1
endi
if $data21 != 2 then
return -1
endi
if $data30 != @21-09-30 15:00:03.000@ then
return -1
endi
if $data31 != 2 then
return -1
endi
if $data40 != @21-09-30 15:00:04.000@ then
return -1
endi
if $data41 != 2 then
return -1
endi
if $data50 != @21-09-30 15:00:05.000@ then
return -1
endi
if $data51 != 2 then
return -1
endi
if $data60 != @21-09-30 15:00:06.000@ then
return -1
endi
if $data61 != 1 then
return -1
endi
if $data70 != @21-09-30 15:00:07.000@ then
return -1
endi
if $data71 != 1 then
return -1
endi
if $data80 != @21-09-30 15:00:08.000@ then
return -1
endi
if $data81 != 1 then
return -1
endi
if $data90 != @21-09-30 15:00:09.000@ then
return -1
endi
if $data91 != 1 then
return -1
endi
sql select sum(sa) from (select sum(a) as sa from meters interval(1s) group by tbname) interval(1s);
if $rows != 11 then
return -1
endi
if $data00 != @21-09-30 15:00:00.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data10 != @21-09-30 15:00:01.000@ then
return -1
endi
if $data11 != 23 then
return -1
endi
if $data20 != @21-09-30 15:00:02.000@ then
return -1
endi
if $data21 != 46 then
return -1
endi
if $data30 != @21-09-30 15:00:03.000@ then
return -1
endi
if $data31 != 69 then
return -1
endi
if $data40 != @21-09-30 15:00:04.000@ then
return -1
endi
if $data41 != 92 then
return -1
endi
if $data50 != @21-09-30 15:00:05.000@ then
return -1
endi
if $data51 != 115 then
return -1
endi
if $data60 != @21-09-30 15:00:06.000@ then
return -1
endi
if $data61 != 6 then
return -1
endi
if $data70 != @21-09-30 15:00:07.000@ then
return -1
endi
if $data71 != 7 then
return -1
endi
if $data80 != @21-09-30 15:00:08.000@ then
return -1
endi
if $data81 != 8 then
return -1
endi
if $data90 != @21-09-30 15:00:09.000@ then
return -1
endi
if $data91 != 9 then
return -1
endi
sql select sum(sa) from (select sum(a) as sa from meters interval(1s) group by area) interval(1s);
if $rows != 11 then
return -1
endi
if $data00 != @21-09-30 15:00:00.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data10 != @21-09-30 15:00:01.000@ then
return -1
endi
if $data11 != 23 then
return -1
endi
if $data20 != @21-09-30 15:00:02.000@ then
return -1
endi
if $data21 != 46 then
return -1
endi
if $data30 != @21-09-30 15:00:03.000@ then
return -1
endi
if $data31 != 69 then
return -1
endi
if $data40 != @21-09-30 15:00:04.000@ then
return -1
endi
if $data41 != 92 then
return -1
endi
if $data50 != @21-09-30 15:00:05.000@ then
return -1
endi
if $data51 != 115 then
return -1
endi
if $data60 != @21-09-30 15:00:06.000@ then
return -1
endi
if $data61 != 6 then
return -1
endi
if $data70 != @21-09-30 15:00:07.000@ then
return -1
endi
if $data71 != 7 then
return -1
endi
if $data80 != @21-09-30 15:00:08.000@ then
return -1
endi
if $data81 != 8 then
return -1
endi
if $data90 != @21-09-30 15:00:09.000@ then
return -1
endi
if $data91 != 9 then
return -1
endi
sql select count(*) from (select count(*) from meters interval(1s)) interval(1s);
if $rows != 11 then
return -1
endi
if $data00 != @21-09-30 15:00:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != @21-09-30 15:00:01.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data20 != @21-09-30 15:00:02.000@ then
return -1
endi
if $data21 != 1 then
return -1
endi
if $data30 != @21-09-30 15:00:03.000@ then
return -1
endi
if $data31 != 1 then
return -1
endi
if $data40 != @21-09-30 15:00:04.000@ then
return -1
endi
if $data41 != 1 then
return -1
endi
if $data50 != @21-09-30 15:00:05.000@ then
return -1
endi
if $data51 != 1 then
return -1
endi
if $data60 != @21-09-30 15:00:06.000@ then
return -1
endi
if $data61 != 1 then
return -1
endi
if $data70 != @21-09-30 15:00:07.000@ then
return -1
endi
if $data71 != 1 then
return -1
endi
if $data80 != @21-09-30 15:00:08.000@ then
return -1
endi
if $data81 != 1 then
return -1
endi
if $data90 != @21-09-30 15:00:09.000@ then
return -1
endi
if $data91 != 1 then
return -1
endi
sql select count(*) from (select count(*) from meters interval(1s) group by tbname);
if $rows != 1 then
return -1
endi
if $data00 != 24 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册