提交 bce29321 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

......@@ -64,7 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper);
/**
* Set the input data block for the stream scan.
......
......@@ -826,7 +826,7 @@ TEST(testCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1");
TAOS_RES* pRes = taos_query(pConn, "select cast(0 as timestamp)-1y");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create database, code:%s", taos_errstr(pRes));
taos_free_result(pRes);
......
......@@ -1163,9 +1163,15 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
pColumn->varmeta.length = 0;
if (pColumn->varmeta.offset > 0) {
memset(pColumn->varmeta.offset, 0, sizeof(int32_t) * numOfRows);
}
} else {
if (pColumn->nullbitmap != NULL) {
memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows));
if (pColumn->pData != NULL) {
memset(pColumn->pData, 0, pColumn->info.bytes * numOfRows);
}
}
}
}
......
......@@ -89,6 +89,7 @@ typedef struct {
STqExecDb execDb;
};
int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper *pSchemaWrapper; // columns that are involved in query
} STqExecHandle;
typedef struct {
......
......@@ -579,9 +579,11 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
while (1) {
SSDataBlock *output = NULL;
uint64_t ts;
if (qExecTask(pItem->taskInfo, &output, &ts) < 0) {
int32_t code = qExecTask(pItem->taskInfo, &output, &ts);
if (code < 0) {
smaError("vgId:%d, qExecTask for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), suid,
pItem->level, terrstr());
pItem->level, terrstr(code));
goto _err;
}
if (!output) {
......@@ -597,7 +599,6 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
}
taosArrayPush(pResult, output);
}
if (taosArrayGetSize(pResult) > 0) {
#if 1
......@@ -616,17 +617,19 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr());
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
taosMemoryFreeClear(pReq);
taosArrayClear(pResult);
} else if (terrno == 0) {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level);
} else {
smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno));
}
}
tdDestroySDataBlockArray(pResult);
return TSDB_CODE_SUCCESS;
......
......@@ -526,8 +526,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task[i] =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
......
......@@ -93,7 +93,8 @@ int32_t tqMetaOpen(STQ* pTq) {
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols);
handle.execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols,
&handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task[i], &scanner);
......
......@@ -1940,17 +1940,20 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (pDelFile) {
SDelFReader* pDelFReader = NULL;
code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL);
if (code) {
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
SArray* aDelIdx = taosArrayInit(4, sizeof(SDelIdx));
if (aDelIdx == NULL) {
tsdbDelFReaderClose(&pDelFReader);
goto _err;
}
code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
if (code) {
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aDelIdx);
tsdbDelFReaderClose(&pDelFReader);
goto _err;
}
......@@ -1959,11 +1962,15 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (pIdx != NULL) {
code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
}
taosArrayDestroy(aDelIdx);
tsdbDelFReaderClose(&pDelFReader);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
}
}
SDelData* p = NULL;
if (pMemTbData != NULL) {
......@@ -2529,8 +2536,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
if (pDumpInfo->rowIndex >= pBlock->nRow) {
if (pDumpInfo->rowIndex >= pDumpInfo->totalRows) {
*state = CHECK_FILEBLOCK_CONT;
}
}
......
......@@ -164,6 +164,7 @@ typedef struct {
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
SSchemaWrapper* qsw;
} SSchemaInfo;
typedef struct SExecTaskInfo {
......@@ -868,7 +869,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
......
......@@ -120,7 +120,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols) {
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper) {
if (msg == NULL) {
// TODO create raw scan
return NULL;
......@@ -154,6 +154,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
}
}
*pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
return pTaskInfo;
}
......
......@@ -199,6 +199,10 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
if (pTaskInfo == NULL) {
return;
}
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
queryCostStatis(pTaskInfo); // print the query cost summary
......
......@@ -1647,11 +1647,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) {
int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows);
return pBlock->info.rows;
}
void queryCostStatis(SExecTaskInfo* pTaskInfo) {
STaskCostInfo* pSummary = &pTaskInfo->cost;
......@@ -4147,35 +4142,62 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea
static SArray* extractColumnInfo(SNodeList* pNodeList);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid);
int32_t code = metaGetTableEntryByUid(&mr, pScanNode->uid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get the table meta, uid:0x%"PRIx64", suid:0x%"PRIx64 ", %s", pScanNode->uid, pScanNode->suid,
GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
return terrno;
}
pTaskInfo->schemaInfo.tablename = strdup(mr.me.name);
SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
pSchemaInfo->tablename = strdup(mr.me.name);
if (mr.me.type == TSDB_SUPER_TABLE) {
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
tb_uid_t suid = mr.me.ctbEntry.suid;
metaGetTableEntryByUid(&mr, suid);
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pTaskInfo->schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else {
pTaskInfo->schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
metaReaderClear(&mr);
pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
return TSDB_CODE_SUCCESS;
}
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
int32_t numOfCols = LIST_LENGTH(pScanNode->pScanCols);
SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
pqSw->pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);
SColumnNode* pColNode = (SColumnNode*)pNode->pExpr;
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
pSchema->colId = pColNode->colId;
pSchema->type = pColNode->node.resType.type;
pSchema->type = pColNode->node.resType.bytes;
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
}
return pqSw;
}
static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
taosMemoryFreeClear(pSchemaInfo->dbname);
if (pSchemaInfo->sw == NULL) {
......@@ -4183,8 +4205,8 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
}
taosMemoryFree(pSchemaInfo->tablename);
taosMemoryFree(pSchemaInfo->sw->pSchema);
taosMemoryFree(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->sw);
tDeleteSSchemaWrapper(pSchemaInfo->qsw);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
......@@ -4385,7 +4407,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
......@@ -4405,7 +4427,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
......@@ -4422,11 +4444,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
if (pHandle->vnode) {
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
......@@ -4436,7 +4453,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
}
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup);
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
......@@ -4487,7 +4505,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
code = extractTableSchemaInfo(pHandle, &pScanNode->scan, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......
......@@ -1525,7 +1525,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
}
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
SExecTaskInfo* pTaskInfo) {
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -1539,6 +1539,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pTagCond = pTagCond;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pTableScanNode->watermark,
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
......@@ -1591,7 +1597,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
}
if (pTSInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pTwSup->waterMark);
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark);
} else {
pInfo->pUpdateInfo = NULL;
}
......@@ -1631,7 +1637,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createPullDataBlock();
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
pInfo->twAggSup = *pTwSup;
pOperator->name = "StreamScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
......
......@@ -66,12 +66,32 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) {
static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
static void doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) {
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, rowIndex, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, rowIndex, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, rowIndex, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, rowIndex, (const char*)&currentKey, false);
} else { // varchar/nchar data
colDataAppendNULL(pDst, rowIndex);
}
}
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
bool outOfBound) {
SPoint point1, point2, point;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
// set the primary timestamp column value
// set the primary timestamp column value
int32_t index = pBlock->info.rows;
// set the other values
......@@ -160,30 +180,13 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
} else { // fill with user specified value for each column
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
if (TSDB_COL_IS_TAG(pCol->flag)) {
continue;
}
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
colDataAppend(pDst, index, (char*)&v, false);
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else { // varchar/nchar data
colDataAppendNULL(pDst, index);
}
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
}
}
......@@ -273,7 +276,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
return outputRows;
}
} else {
assert(pFillInfo->currentKey == ts);
ASSERT(pFillInfo->currentKey == ts);
int32_t index = pBlock->info.rows;
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
......@@ -295,27 +298,32 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
char* src = colDataGetData(pSrc, pFillInfo->index);
if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) {
if (/*i == 0 || (*/!colDataIsNull_s(pSrc, pFillInfo->index)) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
} else {
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
} else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
colDataAppend(pDst, index, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull);
saveColData(pFillInfo->prev, i, src, isNull); // todo:
} else if (pFillInfo->type == TSDB_FILL_NULL) {
colDataAppendNULL(pDst, index);
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i);
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDst, index, pKey);
} else {
SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
colDataAppend(pDst, index, (char*)&pVar->i, false);
doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
}
}
}
}
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create database
sql drop database if exists d0
sql create database d0 keep 365000d,365000d,365000d
sql use d0
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int unsigned, c2 double, c3 binary(10), c4 nchar(10), c5 double) tags (city binary(20),district binary(20));
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang")
sql create table ct2 using stb tags("BeiJing", "HaiDian")
sql create table ct3 using stb tags("BeiJing", "PingGu")
sql create table ct4 using stb tags("BeiJing", "YanQing")
sql show tables
if $rows != 4 then
print rows $rows != 4
return -1
endi
print =============== step 1 insert records into ct1 - taosd merge
sql insert into ct1(ts,c1,c2) values('2022-05-03 16:59:00.010', 10, 20);
sql insert into ct1(ts,c1,c2,c3,c4) values('2022-05-03 16:59:00.011', 11, NULL, 'binary', 'nchar');
sql insert into ct1 values('2022-05-03 16:59:00.016', 16, NULL, NULL, 'nchar', NULL);
sql insert into ct1 values('2022-05-03 16:59:00.016', 17, NULL, NULL, 'nchar', 170);
sql insert into ct1 values('2022-05-03 16:59:00.020', 20, NULL, NULL, 'nchar', 200);
sql insert into ct1 values('2022-05-03 16:59:00.016', 18, NULL, NULL, 'nchar', 180);
sql insert into ct1 values('2022-05-03 16:59:00.021', 21, NULL, NULL, 'nchar', 210);
sql insert into ct1 values('2022-05-03 16:59:00.022', 22, NULL, NULL, 'nchar', 220);
print =============== step 2 insert records into ct1/ct2 - taosc merge for 2022-05-03 16:59:00.010
sql insert into ct1(ts,c1,c2) values('2022-05-03 16:59:00.010', 10,10), ('2022-05-03 16:59:00.010',20,10.0), ('2022-05-03 16:59:00.010',30,NULL) ct2(ts,c1) values('2022-05-03 16:59:00.010',10), ('2022-05-03 16:59:00.010',20) ct1(ts,c2) values('2022-05-03 16:59:00.010',10), ('2022-05-03 16:59:00.010',100) ct1(ts,c3) values('2022-05-03 16:59:00.010','bin1'), ('2022-05-03 16:59:00.010','bin2') ct1(ts,c4,c5) values('2022-05-03 16:59:00.010',NULL,NULL), ('2022-05-03 16:59:00.010','nchar4',1000.01) ct2(ts,c2,c3,c4,c5) values('2022-05-03 16:59:00.010',20,'xkl','zxc',10);
print =============== step 3 insert records into ct3
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.020', 10,10);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.021', 10,10), ('2022-05-03 16:59:00.021',20,20.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.022', 30,30), ('2022-05-03 16:59:00.022',40,40.0),('2022-05-03 16:59:00.022',50,50.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.023', 60,60), ('2022-05-03 16:59:00.023',70,70.0),('2022-05-03 16:59:00.023',80,80.0), ('2022-05-03 16:59:00.023',90,90.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.024', 100,100), ('2022-05-03 16:59:00.025',110,110.0),('2022-05-03 16:59:00.025',120,120.0), ('2022-05-03 16:59:00.025',130,130.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.030', 140,140), ('2022-05-03 16:59:00.030',150,150.0),('2022-05-03 16:59:00.031',160,160.0), ('2022-05-03 16:59:00.030',170,170.0), ('2022-05-03 16:59:00.031',180,180.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.042', 190,190), ('2022-05-03 16:59:00.041',200,200.0),('2022-05-03 16:59:00.040',210,210.0);
sql insert into ct3(ts,c1,c5) values('2022-05-03 16:59:00.050', 220,220), ('2022-05-03 16:59:00.051',230,230.0),('2022-05-03 16:59:00.052',240,240.0);
print =============== step 4 insert records into ct4
sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.020', 10,'b0','n0');
sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.021', 20,'b1','n1'), ('2022-05-03 16:59:00.021',30,'b2','n2');
sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.022', 40,'b3','n3'), ('2022-05-03 16:59:00.022',40,'b4','n4'),('2022-05-03 16:59:00.022',50,'b5','n5');
sql insert into ct4(ts,c1,c3,c4) values('2022-05-03 16:59:00.023', 60,'b6','n6'), ('2022-05-03 16:59:00.024',70,'b7','n7'),('2022-05-03 16:59:00.024',80,'b8','n8'), ('2022-05-03 16:59:00.023',90,'b9','n9');
print =============== step 5 query records of ct1 from memory(taosc and taosd merge)
sql select * from ct1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print =============== step 6 query records of ct2 from memory(taosc and taosd merge)
sql select * from ct2;
print $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
print rows $rows != 1
return -1
endi
print =============== step 7 query records of ct3 from memory
sql select * from ct3;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print $data60 $data61 $data62 $data63 $data64 $data65
print $data70 $data71 $data72 $data73 $data74 $data75
print $data80 $data81 $data82 $data83 $data84 $data85
print $data90 $data91 $data92 $data93 $data94 $data95
print $data[10][0] $data[10][1] $data[10][2] $data[10][3] $data[10][4] $data[10][5]
print $data[11][0] $data[11][1] $data[11][2] $data[11][3] $data[11][4] $data[11][5]
print $data[12][0] $data[12][1] $data[12][2] $data[12][3] $data[12][4] $data[12][5]
print $data[13][0] $data[13][1] $data[13][2] $data[13][3] $data[13][4] $data[13][5]
if $rows != 14 then
print rows $rows != 14
return -1
endi
print =============== step 8 query records of ct4 from memory
sql select * from ct4;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
print rows $rows != 5
return -1
endi
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== step 9 query records of ct1 from file
sql select * from ct1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
if $rows != 6 then
print rows $rows != 6
return -1
endi
print =============== step 10 query records of ct2 from file
sql select * from ct2;
print $data00 $data01 $data02 $data03 $data04 $data05
if $rows != 1 then
print rows $rows != 1
return -1
endi
print =============== step 11 query records of ct3 from file
sql select * from ct3;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print $data60 $data61 $data62 $data63 $data64 $data65
print $data70 $data71 $data72 $data73 $data74 $data75
print $data80 $data81 $data82 $data83 $data84 $data85
print $data90 $data91 $data92 $data93 $data94 $data95
print $data[10][0] $data[10][1] $data[10][2] $data[10][3] $data[10][4] $data[10][5]
print $data[11][0] $data[11][1] $data[11][2] $data[11][3] $data[11][4] $data[11][5]
print $data[12][0] $data[12][1] $data[12][2] $data[12][3] $data[12][4] $data[12][5]
print $data[13][0] $data[13][1] $data[13][2] $data[13][3] $data[13][4] $data[13][5]
print =============== step 12 query records of ct4 from file
sql select * from ct4;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
\ No newline at end of file
Subproject commit 69b558ccbfe54a4407fe23eeae2e67c540f59e55
Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
Subproject commit 267a96fb09fc2ba14acfa47f7d3678def64c29c5
Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册