提交 3d08e526 编写于 作者: wmmhello's avatar wmmhello

refactor:merge from 3.0

...@@ -8,135 +8,202 @@ def skipbuild=0 ...@@ -8,135 +8,202 @@ def skipbuild=0
def win_stop=0 def win_stop=0
def abortPreviousBuilds() { def abortPreviousBuilds() {
def currentJobName = env.JOB_NAME def currentJobName = env.JOB_NAME
def currentBuildNumber = env.BUILD_NUMBER.toInteger() def currentBuildNumber = env.BUILD_NUMBER.toInteger()
def jobs = Jenkins.instance.getItemByFullName(currentJobName) def jobs = Jenkins.instance.getItemByFullName(currentJobName)
def builds = jobs.getBuilds() def builds = jobs.getBuilds()
for (build in builds) { for (build in builds) {
if (!build.isBuilding()) { if (!build.isBuilding()) {
continue; continue;
} }
if (currentBuildNumber == build.getNumber().toInteger()) { if (currentBuildNumber == build.getNumber().toInteger()) {
continue; continue;
} }
build.doKill() //doTerm(),doKill(),doTerm() build.doKill() //doTerm(),doKill(),doTerm()
} }
} }
// abort previous build // abort previous build
abortPreviousBuilds() abortPreviousBuilds()
def abort_previous(){ def abort_previous(){
def buildNumber = env.BUILD_NUMBER as int def buildNumber = env.BUILD_NUMBER as int
if (buildNumber > 1) milestone(buildNumber - 1) if (buildNumber > 1) milestone(buildNumber - 1)
milestone(buildNumber) milestone(buildNumber)
} }
def pre_test(){ def pre_test(){
sh'hostname' sh 'hostname'
sh ''' sh '''
date date
sudo rmtaos || echo "taosd has not installed" sudo rmtaos || echo "taosd has not installed"
''' '''
sh ''' sh '''
killall -9 taosd ||echo "no taosd running" killall -9 taosd ||echo "no taosd running"
killall -9 gdb || echo "no gdb running" killall -9 gdb || echo "no gdb running"
killall -9 python3.8 || echo "no python program running" killall -9 python3.8 || echo "no python program running"
cd ${WKC} cd ${WKC}
''' '''
script { script {
if (env.CHANGE_TARGET == 'master') { if (env.CHANGE_TARGET == 'master') {
sh ''' sh '''
cd ${WKC} cd ${WKC}
git checkout master git checkout master
''' '''
} else if(env.CHANGE_TARGET == '2.0') {
sh '''
cd ${WKC}
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
sh '''
cd ${WKC}
git checkout 3.0
[ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../..
'''
} else {
sh '''
cd ${WKC}
git checkout develop
'''
} }
else if(env.CHANGE_TARGET == '2.0'){ }
sh ''' sh '''
cd ${WKC}
git checkout 2.0
'''
}
else if(env.CHANGE_TARGET == '3.0'){
sh '''
cd ${WKC} cd ${WKC}
git checkout 3.0 git pull >/dev/null
[ -d contrib/bdb ] && cd contrib/bdb && git clean -fxd && cd ../.. git fetch origin +refs/pull/${CHANGE_ID}/merge
''' git checkout -qf FETCH_HEAD
} git submodule update --init --recursive
else{ '''
sh ''' sh '''
cd ${WKC} cd ${WKC}
git checkout develop export TZ=Asia/Harbin
''' date
} rm -rf debug
} mkdir debug
sh''' cd debug
cd ${WKC} cmake .. > /dev/null
git pull >/dev/null make -j4> /dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git submodule update --init --recursive
''' '''
sh''' sh '''
cd ${WKC} cd ${WKPY}
export TZ=Asia/Harbin git reset --hard
date git pull
rm -rf debug pip3 install .
mkdir debug
cd debug
cmake .. > /dev/null
make -j4> /dev/null
''' '''
sh''' return 1
cd ${WKPY} }
git reset --hard def pre_test_win(){
git pull bat '''
pip3 install . hostname
date /t
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug
exit 0
'''
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git reset --hard
git fetch || git fetch
git checkout -f
'''
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout master
'''
} else if(env.CHANGE_TARGET == '2.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout 2.0
'''
} else if(env.CHANGE_TARGET == '3.0') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout 3.0
'''
} else {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git checkout develop
'''
}
}
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
git branch
git pull || git pull
git fetch origin +refs/pull/%CHANGE_ID%/merge
git checkout -qf FETCH_HEAD
'''
}
def pre_test_build_win() {
bat '''
echo "building ..."
time /t
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine
mkdir debug
cd debug
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
set CL=/MP8
cmake .. -G "NMake Makefiles JOM"
jom -j 4 || exit 8
time /t
''' '''
return 1 return 1
} }
pipeline { pipeline {
agent none agent none
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine' WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python' WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
} }
stages { stages {
stage('pre_build'){ stage('run test') {
agent{label " slave3_0 || slave15 || slave16 || slave17 "} parallel {
options { skipDefaultCheckout() } stage('windows test') {
when { agent {label " windows11 "}
changeRequest() steps {
} pre_test_win()
steps { pre_test_build_win()
script{ }
abort_previous() }
abortPreviousBuilds() stage('linux test') {
} agent{label " slave3_0 || slave15 || slave16 || slave17 "}
timeout(time: 45, unit: 'MINUTES'){ options { skipDefaultCheckout() }
pre_test() when {
sh''' changeRequest()
cd ${WKC}/debug }
ctest -VV steps {
''' timeout(time: 45, unit: 'MINUTES'){
sh''' pre_test()
export LD_LIBRARY_PATH=${WKC}/debug/build/lib sh '''
cd ${WKC}/tests/system-test cd ${WKC}/debug
./fulltest.sh ctest -VV
''' '''
sh''' sh '''
cd ${WKC}/tests export LD_LIBRARY_PATH=${WKC}/debug/build/lib
./test-all.sh b1fq cd ${WKC}/tests/system-test
''' ./fulltest.sh
'''
sh '''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
}
} }
} }
} }
} post {
post {
success { success {
emailext ( emailext (
subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS", subject: "PR-result: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]' SUCCESS",
......
...@@ -2578,6 +2578,28 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { ...@@ -2578,6 +2578,28 @@ static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
} }
#define TD_AUTO_CREATE_TABLE 0x1
typedef struct {
int64_t suid;
int64_t uid;
int32_t sver;
uint64_t nData;
const void* pData;
SVCreateTbReq cTbReq;
} SVSubmitBlk;
typedef struct {
int32_t flags;
int32_t nBlocks;
union {
SArray* pArray;
SVSubmitBlk* pBlocks;
};
} SVSubmitReq;
int32_t tEncodeSVSubmitReq(SCoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SCoder* pCoder, SVSubmitReq* pReq);
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -3860,3 +3860,65 @@ int32_t tDecodeSVDropStbReq(SCoder *pCoder, SVDropStbReq *pReq) { ...@@ -3860,3 +3860,65 @@ int32_t tDecodeSVDropStbReq(SCoder *pCoder, SVDropStbReq *pReq) {
tEndDecode(pCoder); tEndDecode(pCoder);
return 0; return 0;
} }
static int32_t tEncodeSVSubmitBlk(SCoder *pCoder, const SVSubmitBlk *pBlock, int32_t flags) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pBlock->suid) < 0) return -1;
if (tEncodeI64(pCoder, pBlock->uid) < 0) return -1;
if (tEncodeI32v(pCoder, pBlock->sver) < 0) return -1;
if (tEncodeBinary(pCoder, pBlock->pData, pBlock->nData) < 0) return -1;
if (flags & TD_AUTO_CREATE_TABLE) {
if (tEncodeSVCreateTbReq(pCoder, &pBlock->cTbReq) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
static int32_t tDecodeSVSubmitBlk(SCoder *pCoder, SVSubmitBlk *pBlock, int32_t flags) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pBlock->suid) < 0) return -1;
if (tDecodeI64(pCoder, &pBlock->uid) < 0) return -1;
if (tDecodeI32v(pCoder, &pBlock->sver) < 0) return -1;
if (tDecodeBinary(pCoder, &pBlock->pData, &pBlock->nData) < 0) return -1;
if (flags & TD_AUTO_CREATE_TABLE) {
if (tDecodeSVCreateTbReq(pCoder, &pBlock->cTbReq) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSVSubmitReq(SCoder *pCoder, const SVSubmitReq *pReq) {
int32_t nBlocks = taosArrayGetSize(pReq->pArray);
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
if (tEncodeI32v(pCoder, nBlocks) < 0) return -1;
for (int32_t iBlock = 0; iBlock < nBlocks; iBlock++) {
if (tEncodeSVSubmitBlk(pCoder, (SVSubmitBlk *)taosArrayGet(pReq->pArray, iBlock), pReq->flags) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSVSubmitReq(SCoder *pCoder, SVSubmitReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->nBlocks) < 0) return -1;
pReq->pBlocks = tCoderMalloc(pCoder, sizeof(SVSubmitBlk) * pReq->nBlocks);
if (pReq->pBlocks == NULL) return -1;
for (int32_t iBlock = 0; iBlock < pReq->nBlocks; iBlock++) {
if (tDecodeSVSubmitBlk(pCoder, pReq->pBlocks + iBlock, pReq->flags) < 0) return -1;
}
tEndDecode(pCoder);
return 0;
}
\ No newline at end of file
...@@ -30,6 +30,7 @@ target_sources( ...@@ -30,6 +30,7 @@ target_sources(
"src/tsdb/tsdbFS.c" "src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c" "src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c" "src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c" "src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbSma.c" "src/tsdb/tsdbSma.c"
......
...@@ -40,7 +40,6 @@ typedef struct STable STable; ...@@ -40,7 +40,6 @@ typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
......
...@@ -100,6 +100,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version ...@@ -100,6 +100,7 @@ int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, int32_t* pAffectedRows);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
...@@ -125,7 +126,6 @@ int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); ...@@ -125,7 +126,6 @@ int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore); void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore); void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType); int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
int32_t tsdbProcessSubmitReq(STsdb* pTsdb, int64_t version, void* pReq);
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
......
...@@ -69,10 +69,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith); ...@@ -69,10 +69,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static void tsdbResetCommitFile(SCommitH *pCommith); static void tsdbResetCommitFile(SCommitH *pCommith);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
// static int tsdbCommitMeta(STsdbRepo *pRepo);
// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact);
// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid);
// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile);
static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
......
...@@ -191,9 +191,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -191,9 +191,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
} }
int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) { int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// int32_t points = 0;
// STable *pTable = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
STsdbMemTable *pMemTable = pTsdb->mem; STsdbMemTable *pMemTable = pTsdb->mem;
void *tptr; void *tptr;
...@@ -221,8 +218,9 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -221,8 +218,9 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
} }
// copy data to buffer pool // copy data to buffer pool
pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pMsgIter->dataLen + sizeof(*pBlock)); int32_t tlen = pMsgIter->dataLen + pMsgIter->schemaLen + sizeof(*pBlock);
memcpy(pBlkCopy, pBlock, pMsgIter->dataLen + sizeof(*pBlock)); pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, tlen);
memcpy(pBlkCopy, pBlock, tlen);
tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter); tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
if (blkIter.row == NULL) return 0; if (blkIter.row == NULL) return 0;
...@@ -241,7 +239,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo ...@@ -241,7 +239,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
(*pAffectedRows) += pMsgIter->numOfRows; (*pAffectedRows) = pMsgIter->numOfRows;
return 0; return 0;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef struct SMemTable SMemTable;
typedef struct SMemData SMemData;
typedef struct SMemSkipList SMemSkipList;
typedef struct SMemSkipListCfg SMemSkipListCfg;
struct SMemTable {
STsdb *pTsdb;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
int64_t nRows;
int32_t nHash;
int32_t nBucket;
SMemData **pBuckets;
};
struct SMemSkipListCfg {
int8_t maxLevel;
int32_t nKey;
int32_t nData;
};
struct SMemSkipList {
int8_t level;
uint32_t seed;
};
struct SMemData {
SMemData *pHashNext;
tb_uid_t suid;
tb_uid_t uid;
TSKEY minKey;
TSKEY maxKey;
int64_t minVer;
int64_t maxVer;
int64_t nRows;
SMemSkipList sl;
};
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL;
pMemTb = taosMemoryCalloc(1, sizeof(*pMemTb));
if (pMemTb == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMemTb->pTsdb = pTsdb;
pMemTb->minKey = TSKEY_MAX;
pMemTb->maxKey = TSKEY_MIN;
pMemTb->minVer = -1;
pMemTb->maxVer = -1;
pMemTb->nRows = 0;
pMemTb->nHash = 0;
pMemTb->nBucket = 1024;
pMemTb->pBuckets = taosMemoryCalloc(pMemTb->nBucket, sizeof(*pMemTb->pBuckets));
if (pMemTb->pBuckets == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
*ppMemTb = pMemTb;
return 0;
}
int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMT) {
// TODO
return 0;
}
// SMemData
// SMemSkipList
\ No newline at end of file
...@@ -1962,6 +1962,21 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { ...@@ -1962,6 +1962,21 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
......
...@@ -450,7 +450,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -450,7 +450,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SCoder coder = {0}; SCoder coder = {0};
int ret; int ret;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
pRsp->contLen = 0; pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -473,9 +473,13 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -473,9 +473,13 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
/* code */ /* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq); ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
if (ret < 0) { if (ret < 0) {
dropTbRsp.code = TSDB_CODE_SUCCESS; if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
} else {
dropTbRsp.code = terrno;
}
} else { } else {
dropTbRsp.code = terrno; dropTbRsp.code = TSDB_CODE_SUCCESS;
} }
taosArrayPush(rsp.pArray, &dropTbRsp); taosArrayPush(rsp.pArray, &dropTbRsp);
...@@ -488,20 +492,55 @@ _exit: ...@@ -488,20 +492,55 @@ _exit:
} }
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp rsp = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock;
SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0};
SCoder coder = {0};
int32_t nRows;
pRsp->code = 0; pRsp->code = 0;
// handle the request // handle the request
if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) { if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
pRsp->code = terrno; pRsp->code = TSDB_CODE_INVALID_MSG;
return -1; goto _exit;
} }
// pRsp->msgType = TDMT_VND_SUBMIT_RSP; for (;;) {
// vnodeProcessSubmitReq(pVnode, ptr, pRsp); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
// create table for auto create table mode
if (msgIter.schemaLen > 0) {
tCoderInit(&coder, TD_LITTLE_ENDIAN, pBlock->data, msgIter.schemaLen, TD_DECODER);
if (tDecodeSVCreateTbReq(&coder, &createTbReq) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
tCoderClear(&coder);
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
pRsp->code = terrno;
tCoderClear(&coder);
goto _exit;
}
}
tCoderClear(&coder);
}
if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &nRows) < 0) {
pRsp->code = terrno;
goto _exit;
}
rsp.numOfRows += nRows;
}
_exit:
// encode the response (TODO) // encode the response (TODO)
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
memcpy(pRsp->pCont, &rsp, sizeof(rsp)); memcpy(pRsp->pCont, &rsp, sizeof(rsp));
...@@ -511,18 +550,3 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -511,18 +550,3 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
return 0; return 0;
} }
int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if (!pReq) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include "ttime.h" #include "ttime.h"
#include "ttypes.h" #include "ttypes.h"
// clang-format off
#define NEXT_TOKEN(pSql, sToken) \ #define NEXT_TOKEN(pSql, sToken) \
do { \ do { \
int32_t index = 0; \ int32_t index = 0; \
...@@ -248,12 +247,11 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool ...@@ -248,12 +247,11 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
} else { } else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
&pCxt->pTableMeta)); &pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
} }
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -826,12 +824,21 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { ...@@ -826,12 +824,21 @@ static int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, STableMeta* pMeta) { static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName* pTableName, const char* pName,
int32_t len, STableMeta* pMeta) {
SVgroupInfo vg;
SParseContext* pBasicCtx = pCxt->pComCxt;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = tGenIdPI64();
pMeta->vgId = vg.vgId;
STableMeta* pBackup = NULL; STableMeta* pBackup = NULL;
if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) { if (TSDB_CODE_SUCCESS != cloneTableMeta(pMeta, &pBackup)) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pBackup->uid = tGenIdPI64();
return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES); return taosHashPut(pHash, pName, len, &pBackup, POINTER_BYTES);
} }
...@@ -854,7 +861,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) ...@@ -854,7 +861,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
} }
CHECK_CODE(storeTableMeta(pCxt->pSubTableHashObj, tbFName, len, pCxt->pTableMeta)); CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
...@@ -1257,7 +1264,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash ...@@ -1257,7 +1264,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen){ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen){
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) { if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
...@@ -1305,11 +1312,10 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tN ...@@ -1305,11 +1312,10 @@ int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tN
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder}; SMemParam param = {.rb = pBuilder};
...@@ -1384,10 +1390,11 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in ...@@ -1384,10 +1390,11 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBuf, int32_t msgBufLen, int32_t colIdx, int32_t rowNum) { int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
STableDataBlocks *pDataBlock = (STableDataBlocks *)pBlock; int32_t rowNum) {
SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta); STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
int32_t extendedRowSize = getExtendedRowSize(pDataBlock); SSchema* pSchema = getTableColumnSchema(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo;
SRowBuilder* pBuilder = &pDataBlock->rowBuilder; SRowBuilder* pBuilder = &pDataBlock->rowBuilder;
SMemParam param = {.rb = pBuilder}; SMemParam param = {.rb = pBuilder};
...@@ -1452,7 +1459,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBu ...@@ -1452,7 +1459,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_MULTI_BIND *bind, char *msgBu
} }
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
if(rowEnd) { if (rowEnd) {
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols); STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
tdSRowPrint(row, pSTSchema, __func__); tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema); taosMemoryFree(pSTSchema);
......
...@@ -497,14 +497,9 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p ...@@ -497,14 +497,9 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0); ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
} }
int32_t len = pBlocks->numOfRows *
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = int32_t finalLen =
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload); trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
......
...@@ -191,13 +191,13 @@ class TDTestCase: ...@@ -191,13 +191,13 @@ class TDTestCase:
def support_types(self): def support_types(self):
type_error_sql_lists = [ type_error_sql_lists = [
"select log(ts ,2 ) from t1" , "select log(ts ,2 ) from t1" ,
"select log(c7,2 ) from t1", "select log(c7,c2 ) from t1",
"select log(c8,2 ) from t1", "select log(c8,c1 ) from t1",
"select log(c9,2 ) from t1", "select log(c9,c2 ) from t1",
"select log(ts,2 ) from ct1" , "select log(ts,c7 ) from ct1" ,
"select log(c7,2 ) from ct1", "select log(c7,c9 ) from ct1",
"select log(c8,2 ) from ct1", "select log(c8,c2 ) from ct1",
"select log(c9,2 ) from ct1", "select log(c9,c1 ) from ct1",
"select log(ts,2 ) from ct3" , "select log(ts,2 ) from ct3" ,
"select log(c7,2 ) from ct3", "select log(c7,2 ) from ct3",
"select log(c8,2 ) from ct3", "select log(c8,2 ) from ct3",
......
此差异已折叠。
...@@ -22,4 +22,5 @@ python3 ./test.py -f 2-query/abs.py ...@@ -22,4 +22,5 @@ python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/ceil.py python3 ./test.py -f 2-query/ceil.py
python3 ./test.py -f 2-query/floor.py python3 ./test.py -f 2-query/floor.py
python3 ./test.py -f 2-query/round.py python3 ./test.py -f 2-query/round.py
python3 ./test.py -f 2-query/log.py python3 ./test.py -f 2-query/log.py
\ No newline at end of file python3 ./test.py -f 2-query/pow.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册