diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index aa60803dac740157e1a9f5229e524bca8401c4cd..61378c79c4b5c44ffa11ae9132aa6f8b89ab5f71 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -448,6 +448,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols int dcol = 0; while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -458,13 +459,14 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols STColumn *pRowCol = schemaColAt(pSchema, rcol); if (pRowCol->colId == pDataCol->colId) { void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); dcol++; rcol++; } else if (pRowCol->colId < pDataCol->colId) { rcol++; } else { - if(forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } dcol++; @@ -482,6 +484,7 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo int nRowCols = kvRowNCols(row); while (dcol < pCols->numOfCols) { + bool setCol = 0; SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); @@ -493,13 +496,14 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo if (colIdx->colId == pDataCol->colId) { void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + if(!isNull(value, pDataCol->type)) setCol = 1; dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); ++dcol; ++rcol; } else if (colIdx->colId < pDataCol->colId) { ++rcol; } else { - if (forceSetNull) { + if(forceSetNull || setCol) { dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); } ++dcol; @@ -518,7 +522,6 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b } } -//TODO: refactor this function to eliminate additional memory copy int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -534,7 +537,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { - if (source->cols[j].len > 0) { + if (source->cols[j].len > 0 || target->cols[j].len > 0) { dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, target->maxPoints); } @@ -578,7 +581,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i if (key1 < key2) { for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); - if (src1->cols[i].len > 0) { + if (src1->cols[i].len > 0 || target->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); } @@ -596,6 +599,8 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, target->maxPoints); + } else if(target->cols[i].len > 0) { + dataColSetNullAt(&target->cols[i], target->numOfRows); } } target->numOfRows++; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 15fc3cc47d663aa6f2fb8910ebbadc03861418c7..03110487807076bf8ac2ac7026ffdb828ea4c7c6 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1418,13 +1418,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - bool isRowDel = false; SMemRow row = tsdbNextIterRow(pCommitIter->pIter); if (row == NULL || memRowKey(row) > maxKey) { key2 = INT64_MAX; } else { key2 = memRowKey(row); - isRowDel = memRowDeleted(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; @@ -1439,36 +1437,33 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt pTarget->numOfRows++; (*iter)++; } else if (key1 > key2) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); } + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + tSkipListIterNext(pCommitIter->pIter); } else { - if (update) { - if (!isRowDel) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); - } - } else { - ASSERT(!isRowDel); - + if (update != TD_ROW_OVERWRITE_UPDATE) { + //copy disk data for (int i = 0; i < pDataCols->numOfCols; i++) { //TODO: dataColAppendVal may fail dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, pTarget->maxPoints); } - pTarget->numOfRows++; + if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + } + if (update != TD_ROW_DISCARD_UPDATE) { + //copy mem data + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } (*iter)++; tSkipListIterNext(pCommitIter->pIter); diff --git a/tests/gotest/batchtest.sh b/tests/gotest/batchtest.sh index eae467dac046a48fff1ab0541df621ef3f4de162..95d984a7013ef4e7715af8764531434a564a382b 100755 --- a/tests/gotest/batchtest.sh +++ b/tests/gotest/batchtest.sh @@ -17,7 +17,7 @@ go env -w GO111MODULE=on go env -w GOPROXY=https://goproxy.io,direct bash ./case001/case001.sh $severIp $serverPort -#bash ./case002/case002.sh $severIp $serverPort +bash ./case002/case002.sh $severIp $serverPort #bash ./case003/case003.sh $severIp $serverPort cd nanosupport diff --git a/tests/gotest/case001/case001.go b/tests/gotest/case001/case001.go index fb94f566dd7fa9ef8932bc28326310681998b410..29bc92f2a0668b3f576145d5bd6d08ed37c82f1b 100644 --- a/tests/gotest/case001/case001.go +++ b/tests/gotest/case001/case001.go @@ -18,7 +18,7 @@ import ( "database/sql" "flag" "fmt" - _ "github.com/taosdata/driver-go/taosSql" + _ "github.com/taosdata/driver-go/v2/taosSql" "log" "strconv" "time" @@ -62,6 +62,7 @@ func main() { url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/" // open connect to taos server + fmt.Printf("url:%s",url) db, err := sql.Open(taosDriverName, url) if err != nil { log.Fatalf("Open database error: %s\n", err) @@ -167,17 +168,18 @@ func insert_data(db *sql.DB, demot string) { func select_data(db *sql.DB, demot string) { st := time.Now().Nanosecond() - + fmt.Println(demot) rows, err := db.Query("select * from ? ", demot) // go text mode + fmt.Println("end query",err) checkErr(err, "select db.Query") fmt.Printf("%10s%s%8s %5s %9s%s %s %8s%s %7s%s %8s%s %4s%s %5s%s\n", " ", "ts", " ", "id", " ", "name", " ", "len", " ", "flag", " ", "notes", " ", "fv", " ", " ", "dv") var affectd int //decoder := mahonia.NewDecoder("gbk") // 把原来ANSI格式的文本文件里的字符,用gbk进行解码。 - + fmt.Println("start next") for rows.Next() { - var ts string + var ts time.Time var name string var id int var len int8 @@ -187,6 +189,7 @@ func select_data(db *sql.DB, demot string) { var dv float64 err = rows.Scan(&ts, &id, &name, &len, &flag, ¬es, &fv, &dv) + fmt.Println("rows:",err) checkErr(err, "select rows.Scan") fmt.Printf("%s|\t", ts) diff --git a/tests/gotest/case002/case002.bat b/tests/gotest/case002/case002.bat new file mode 100644 index 0000000000000000000000000000000000000000..ebec576e724ccb14319dd380c9783a783ac0db62 --- /dev/null +++ b/tests/gotest/case002/case002.bat @@ -0,0 +1,9 @@ +@echo off +echo ==== start run cases001.go + +del go.* +go mod init demotest +go build +demotest.exe -h %1 -p %2 +cd .. + diff --git a/tests/gotest/case002/case002.go b/tests/gotest/case002/case002.go new file mode 100644 index 0000000000000000000000000000000000000000..c69da04cb271c24e33953ca8fdfea71c67349b4f --- /dev/null +++ b/tests/gotest/case002/case002.go @@ -0,0 +1,81 @@ +package main + +import ( + "database/sql/driver" + "fmt" + "io" + "os" + "time" + + taos "github.com/taosdata/driver-go/v2/af" +) + +func Subscribe_check(topic taos.Subscriber, check int) bool { + count := 0 + rows, err := topic.Consume() + defer func() { rows.Close(); time.Sleep(time.Second) }() + if err != nil { + fmt.Println(err) + os.Exit(3) + } + for { + values := make([]driver.Value, 2) + err := rows.Next(values) + if err == io.EOF { + break + } else if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(4) + } + count++ + } + if count == check { + return false + } else { + return true + } +} +func main() { + ts := 1630461600000 + db, err := taos.Open("127.0.0.1", "", "", "", 0) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + defer db.Close() + db.Exec("drop if exists database test") + db.Exec("create if not exists database test") + db.Exec("use test") + db.Exec("drop if exists database test") + db.Exec("create table test (ts timestamp ,level int)") + for i := 0; i < 10; i++ { + sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+i, i) + db.Exec(sqlcmd) + } + + fmt.Println("consumption 01.") + topic, err := db.Subscribe(false, "test", "select ts, level from test", time.Second) + if Subscribe_check(topic, 10) { + os.Exit(3) + } + + fmt.Println("consumption 02: no new rows inserted") + if Subscribe_check(topic, 0) { + os.Exit(3) + } + + fmt.Println("consumption 03: after one new rows inserted") + sqlcmd := fmt.Sprintf("insert into test values(%d,%d)", ts+10, 10) + db.Exec(sqlcmd) + if Subscribe_check(topic, 1) { + os.Exit(3) + } + + fmt.Println("consumption 04: keep progress and continue previous subscription") + topic.Unsubscribe(true) + topic, err = db.Subscribe(false, "test", "select ts, level from test", time.Second) + if Subscribe_check(topic, 0) { + os.Exit(3) + } + +} diff --git a/tests/gotest/case002/case002.sh b/tests/gotest/case002/case002.sh new file mode 100644 index 0000000000000000000000000000000000000000..94e5bb44e03a1f7d2704752fcf9c080abcb4f23f --- /dev/null +++ b/tests/gotest/case002/case002.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +echo "==== start run cases001.go" + +set +e +#set -x + +script_dir="$(dirname $(readlink -f $0))" +#echo "pwd: $script_dir, para0: $0" + +#execName=$0 +#execName=`echo ${execName##*/}` +#goName=`echo ${execName%.*}` + +###### step 3: start build +cd $script_dir +rm -f go.* +go mod init demotest > /dev/null 2>&1 +go mod tidy > /dev/null 2>&1 +go build > /dev/null 2>&1 +sleep 1s +./demotest -h $1 -p $2