提交 d58434c3 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/crash_gen

...@@ -36,6 +36,8 @@ matrix: ...@@ -36,6 +36,8 @@ matrix:
- psmisc - psmisc
before_script: before_script:
- export TZ=Asia/Harbin
- date
- cd ${TRAVIS_BUILD_DIR} - cd ${TRAVIS_BUILD_DIR}
- mkdir debug - mkdir debug
- cd debug - cd debug
...@@ -150,6 +152,8 @@ matrix: ...@@ -150,6 +152,8 @@ matrix:
- DESC="trusty/gcc-4.8 build" - DESC="trusty/gcc-4.8 build"
before_script: before_script:
- export TZ=Asia/Harbin
- date
- cd ${TRAVIS_BUILD_DIR} - cd ${TRAVIS_BUILD_DIR}
- mkdir debug - mkdir debug
- cd debug - cd debug
...@@ -173,6 +177,8 @@ matrix: ...@@ -173,6 +177,8 @@ matrix:
- cmake - cmake
before_script: before_script:
- export TZ=Asia/Harbin
- date
- cd ${TRAVIS_BUILD_DIR} - cd ${TRAVIS_BUILD_DIR}
- mkdir debug - mkdir debug
- cd debug - cd debug
...@@ -197,6 +203,8 @@ matrix: ...@@ -197,6 +203,8 @@ matrix:
- cmake - cmake
before_script: before_script:
- export TZ=Asia/Harbin
- date
- cd ${TRAVIS_BUILD_DIR} - cd ${TRAVIS_BUILD_DIR}
- mkdir debug - mkdir debug
- cd debug - cd debug
...@@ -225,6 +233,8 @@ matrix: ...@@ -225,6 +233,8 @@ matrix:
- DESC="trusty/gcc-4.8 build" - DESC="trusty/gcc-4.8 build"
before_script: before_script:
- export TZ=Asia/Harbin
- date
- cd ${TRAVIS_BUILD_DIR} - cd ${TRAVIS_BUILD_DIR}
- mkdir debug - mkdir debug
- cd debug - cd debug
......
...@@ -84,7 +84,7 @@ typedef struct SRetrieveSupport { ...@@ -84,7 +84,7 @@ typedef struct SRetrieveSupport {
SColumnModel * pFinalColModel; // colModel for final result SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState; SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj; SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times uint32_t numOfRetry; // record the number of retry times
pthread_mutex_t queryMutex; pthread_mutex_t queryMutex;
......
...@@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql); ...@@ -36,6 +36,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql); int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql); void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult); void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
......
...@@ -213,8 +213,7 @@ typedef struct SQueryInfo { ...@@ -213,8 +213,7 @@ typedef struct SQueryInfo {
typedef struct { typedef struct {
int command; int command;
uint8_t msgType; uint8_t msgType;
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
int8_t dataSourceType; // load data from file or not
union { union {
int32_t count; int32_t count;
...@@ -222,18 +221,23 @@ typedef struct { ...@@ -222,18 +221,23 @@ typedef struct {
}; };
int32_t insertType; int32_t insertType;
int32_t clauseIndex; // index of multiple subclause query int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished; int8_t parseFinished;
short numOfCols; short numOfCols;
uint32_t allocSize; uint32_t allocSize;
char * payload; char * payload;
int32_t payloadLen; int32_t payloadLen;
SQueryInfo **pQueryInfo; SQueryInfo **pQueryInfo;
int32_t numOfClause; int32_t numOfClause;
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams; int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
int8_t submitSchema; // submit block is built with table schema
SHashObj *pTableList; // referred table involved in sql
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
} SSqlCmd; } SSqlCmd;
......
...@@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -431,6 +431,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return; return;
} }
tscDebug("%p get tableMeta successfully", pSql);
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
...@@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -446,20 +448,20 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj; SSqlObj * pParObj = trs->pParentSql;
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex && assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0); pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
// NOTE: the vgroupInfo for the queried super table must be existed here.
assert(pTableMetaInfo->vgroupList != NULL);
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return; return;
} }
goto _error;
} else { // continue to process normal async query } else { // continue to process normal async query
if (pCmd->parseFinished) { if (pCmd->parseFinished) {
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql); tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
...@@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -472,18 +474,41 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
// if failed to process sql, go to error handler // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) { // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated
return; // 2. vnode may need the schema information along with submit block to update its local table schema.
if (pCmd->command == TSDB_SQL_INSERT) {
tscDebug("%p redo parse sql string to build submit block", pSql);
pCmd->parseFinished = false;
if ((code = tsParseSql(pSql, true)) == TSDB_CODE_SUCCESS) {
/*
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
* and send the required submit block according to index value in supporter to server.
*/
pSql->fp = pSql->fetchFp; // restore the fp
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
}
} else {// in case of other query type, continue
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
return;
}
} }
// // todo update the submit message according to the new table meta
// // 1. table uid, 2. ip address goto _error;
// code = tscSendMsgToServer(pSql);
// if (code == TSDB_CODE_SUCCESS) return;
} else { } else {
tscDebug("%p continue parse sql after get table meta", pSql); tscDebug("%p continue parse sql after get table meta", pSql);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
...@@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -492,45 +517,49 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else { } else {
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
} }
(*pSql->fp)(pSql->param, pSql, code); (*pSql->fp)(pSql->param, pSql, code);
return; return;
} }
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; // proceed to invoke the tscDoQuery();
} }
} }
} else { // stream computing } else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
pRes->code = code; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return; } else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return;
}
if (pSql->pStream) {
tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) { if (!pSql->cmd.parseFinished) {
tsParseSql(pSql, false); tsParseSql(pSql, false);
sem_post(&pSql->rspSem); sem_post(&pSql->rspSem);
} }
return; return;
} else {
tscDebug("%p get tableMeta successfully", pSql);
} }
tscDoQuery(pSql); tscDoQuery(pSql);
return;
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
}
} }
...@@ -1259,8 +1259,6 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1259,8 +1259,6 @@ int tsParseInsertSql(SSqlObj *pSql) {
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
} }
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
......
...@@ -347,8 +347,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -347,8 +347,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
int doProcessSql(SSqlObj *pSql) { int doProcessSql(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
int32_t code = TSDB_CODE_SUCCESS;
if (pCmd->command == TSDB_SQL_SELECT || if (pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE ||
...@@ -365,10 +364,13 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -365,10 +364,13 @@ int doProcessSql(SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
code = tscSendMsgToServer(pSql); int32_t code = tscSendMsgToServer(pSql);
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRes->code = code; pRes->code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return pRes->code;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
此差异已折叠。
...@@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t ...@@ -562,10 +562,8 @@ int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
int len = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta);
...@@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { ...@@ -575,16 +573,37 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
pDataBlock += sizeof(SSubmitBlk); pDataBlock += sizeof(SSubmitBlk);
int32_t flen = 0; // original total length of row int32_t flen = 0; // original total length of row
for (int32_t i = 0; i < tinfo.numOfColumns; ++i) {
flen += TYPE_BYTES[pSchema[i].type]; // schema needs to be included into the submit data block
if (includeSchema) {
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
for(int32_t j = 0; j < numOfCols; ++j) {
STColumn* pCol = (STColumn*) pDataBlock;
pCol->colId = pSchema[j].colId;
pCol->type = pSchema[j].type;
pCol->bytes = pSchema[j].bytes;
pCol->offset = 0;
pDataBlock += sizeof(STColumn);
flen += TYPE_BYTES[pSchema[j].type];
}
int32_t schemaSize = sizeof(STColumn) * numOfCols;
pBlock->schemaLen = schemaSize;
} else {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
pBlock->schemaLen = 0;
} }
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->len = 0; pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows); int32_t numOfRows = htons(pBlock->numOfRows);
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
SDataRow trow = (SDataRow)pDataBlock; SDataRow trow = (SDataRow) pDataBlock;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
dataRowSetVersion(trow, pTableMeta->sversion); dataRowSetVersion(trow, pTableMeta->sversion);
...@@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { ...@@ -595,20 +614,21 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
p += pSchema[j].bytes; p += pSchema[j].bytes;
} }
// p += pTableDataBlock->rowSize;
pDataBlock += dataRowLen(trow); pDataBlock += dataRowLen(trow);
pBlock->len += dataRowLen(trow); pBlock->dataLen += dataRowLen(trow);
} }
len = pBlock->len; int32_t len = pBlock->dataLen + pBlock->schemaLen;
pBlock->len = htonl(pBlock->len); pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
return len; return len;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
// the expanded size when a row data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; const int32_t MAX_EXPAND_SIZE = TD_DATA_ROW_HEAD_SIZE + TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
...@@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -617,7 +637,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
size_t total = taosArrayGetSize(pTableDataBlockList); size_t total = taosArrayGetSize(pTableDataBlockList);
for (int32_t i = 0; i < total; ++i) { for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = int32_t ret =
...@@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { ...@@ -666,16 +685,17 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion); pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows); pBlocks->numOfRows = htons(pBlocks->numOfRows);
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->submitSchema);
assert(finalLen <= len); assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
// the length does not include the SSubmitBlk structure // the length does not include the SSubmitBlk structure
pBlocks->len = htonl(finalLen); pBlocks->dataLen = htonl(finalLen);
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
} }
......
...@@ -128,10 +128,10 @@ extern float tsTotalLogDirGB; ...@@ -128,10 +128,10 @@ extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB; extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB; extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB; extern float tsAvailLogDirGB;
extern float tsAvailTmpDirGB; extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB; extern float tsAvailDataDirGB;
extern float tsMinimalLogDirGB; extern float tsMinimalLogDirGB;
extern float tsMinimalTmpDirGB; extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB; extern float tsMinimalDataDirGB;
extern int32_t tsTotalMemoryMB; extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion; extern int32_t tsVersion;
......
...@@ -259,7 +259,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { ...@@ -259,7 +259,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
for (int i = 0; i < nEle; i++) { for (int i = 0; i < nEle; i++) {
if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false; if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
} }
return true; return true;
default: default:
......
...@@ -43,7 +43,7 @@ int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM; ...@@ -43,7 +43,7 @@ int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
int32_t tsNumOfMnodes = 3; int32_t tsNumOfMnodes = 3;
// common // common
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 1000;
int32_t tsRpcMaxTime = 600; // seconds; int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsMaxShellConns = 5000; int32_t tsMaxShellConns = 5000;
int32_t tsMaxConnections = 5000; int32_t tsMaxConnections = 5000;
...@@ -170,9 +170,9 @@ int64_t tsStreamMax; ...@@ -170,9 +170,9 @@ int64_t tsStreamMax;
int32_t tsNumOfCores = 1; int32_t tsNumOfCores = 1;
float tsTotalTmpDirGB = 0; float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0; float tsTotalDataDirGB = 0;
float tsAvailTmpDirGB = 0; float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0; float tsAvailDataDirGB = 0;
float tsMinimalTmpDirGB = 0.1; float tsReservedTmpDirectorySpace = 0.1;
float tsMinimalDataDirGB = 0.5; float tsMinimalDataDirGB = 0.5;
int32_t tsTotalMemoryMB = 0; int32_t tsTotalMemoryMB = 0;
int32_t tsVersion = 0; int32_t tsVersion = 0;
...@@ -807,7 +807,7 @@ static void doInitGlobalConfig() { ...@@ -807,7 +807,7 @@ static void doInitGlobalConfig() {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "minimalTmpDirGB"; cfg.option = "minimalTmpDirGB";
cfg.ptr = &tsMinimalTmpDirGB; cfg.ptr = &tsReservedTmpDirectorySpace;
cfg.valType = TAOS_CFG_VTYPE_FLOAT; cfg.valType = TAOS_CFG_VTYPE_FLOAT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0.001; cfg.minValue = 0.001;
......
...@@ -283,7 +283,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -283,7 +283,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
} }
tdAppendColVal(trow, val, c->type, c->bytes, c->offset); tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
} }
pBlk->len = htonl(dataRowLen(trow)); pBlk->dataLen = htonl(dataRowLen(trow));
pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid); pBlk->uid = htobe64(pObj->uid);
pBlk->tid = htonl(pObj->tid); pBlk->tid = htonl(pObj->tid);
......
...@@ -39,6 +39,15 @@ ...@@ -39,6 +39,15 @@
#define MPEER_CONTENT_LEN 2000 #define MPEER_CONTENT_LEN 2000
typedef struct {
pthread_t thread;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
int32_t * vnodeList;
} SOpenVnodeThread;
void * tsDnodeTmr = NULL; void * tsDnodeTmr = NULL;
static void * tsStatusTimer = NULL; static void * tsStatusTimer = NULL;
static uint32_t tsRebootTime; static uint32_t tsRebootTime;
...@@ -242,28 +251,86 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) { ...@@ -242,28 +251,86 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t dnodeOpenVnodes() { static void *dnodeOpenVnode(void *param) {
SOpenVnodeThread *pThread = param;
char vnodeDir[TSDB_FILENAME_LEN * 3]; char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0;
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes;
int32_t status;
status = dnodeGetVnodeList(vnodeList, &numOfVnodes); dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
int32_t vgId = pThread->vnodeList[v];
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
if (vnodeOpen(vgId, vnodeDir) < 0) {
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
pThread->failed++;
} else {
dDebug("vgId:%d, is openned by thread:%d", vgId, pThread->threadIndex);
pThread->opened++;
}
}
dDebug("thread:%d, total vnodes:%d, openned:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
int32_t numOfVnodes;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed"); dInfo("get dnode list failed");
free(vnodeList); free(vnodeList);
return status; return status;
} }
for (int32_t i = 0; i < numOfVnodes; ++i) { int32_t threadNum = tsNumOfCores;
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vnodeList[i]); int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
if (vnodeOpen(vnodeList[i], vnodeDir) < 0) failed++; SOpenVnodeThread *threads = calloc(threadNum, sizeof(SOpenVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].vnodeList = calloc(vnodesPerThread, sizeof(int32_t));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SOpenVnodeThread *pThread = &threads[t];
pThread->vnodeList[pThread->vnodeNum++] = vnodeList[v];
}
dDebug("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pThread->thread, &thAttr, dnodeOpenVnode, pThread) != 0) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
pthread_attr_destroy(&thAttr);
}
int32_t openVnodes = 0;
int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) {
pthread_join(pThread->thread, NULL);
}
openVnodes += pThread->opened;
failedVnodes += pThread->failed;
free(pThread->vnodeList);
} }
free(vnodeList); free(vnodeList);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed); free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -273,7 +340,7 @@ void dnodeStartStream() { ...@@ -273,7 +340,7 @@ void dnodeStartStream() {
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes); int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed"); dInfo("get dnode list failed");
return; return;
} }
...@@ -292,7 +359,7 @@ static void dnodeCloseVnodes() { ...@@ -292,7 +359,7 @@ static void dnodeCloseVnodes() {
status = dnodeGetVnodeList(vnodeList, &numOfVnodes); status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) { if (status != TSDB_CODE_SUCCESS) {
dInfo("Get dnode list failed"); dInfo("get dnode list failed");
free(vnodeList); free(vnodeList);
return; return;
} }
......
...@@ -36,6 +36,7 @@ typedef struct { ...@@ -36,6 +36,7 @@ typedef struct {
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
SReadWorker *readWorker; SReadWorker *readWorker;
pthread_mutex_t mutex;
} SReadWorkerPool; } SReadWorkerPool;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
...@@ -51,27 +52,28 @@ int32_t dnodeInitVnodeRead() { ...@@ -51,27 +52,28 @@ int32_t dnodeInitVnodeRead() {
readPool.min = 2; readPool.min = 2;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; readPool.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min;
readPool.readWorker = (SReadWorker *) calloc(sizeof(SReadWorker), readPool.max); readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max);
pthread_mutex_init(&readPool.mutex, NULL);
if (readPool.readWorker == NULL) return -1; if (readPool.readWorker == NULL) return -1;
for (int i=0; i < readPool.max; ++i) { for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
pWorker->workerId = i; pWorker->workerId = i;
} }
dInfo("dnode read is opened"); dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0; return 0;
} }
void dnodeCleanupVnodeRead() { void dnodeCleanupVnodeRead() {
for (int i=0; i < readPool.max; ++i) { for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(readQset); taosQsetThreadResume(readQset);
} }
} }
for (int i=0; i < readPool.max; ++i) { for (int i = 0; i < readPool.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SReadWorker *pWorker = readPool.readWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
...@@ -80,6 +82,7 @@ void dnodeCleanupVnodeRead() { ...@@ -80,6 +82,7 @@ void dnodeCleanupVnodeRead() {
free(readPool.readWorker); free(readPool.readWorker);
taosCloseQset(readQset); taosCloseQset(readQset);
pthread_mutex_destroy(&readPool.mutex);
dInfo("dnode read is closed"); dInfo("dnode read is closed");
} }
...@@ -136,8 +139,12 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -136,8 +139,12 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
void *dnodeAllocateVnodeRqueue(void *pVnode) { void *dnodeAllocateVnodeRqueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) return NULL; if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex);
return NULL;
}
taosAddIntoQset(readQset, queue, pVnode); taosAddIntoQset(readQset, queue, pVnode);
...@@ -160,6 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { ...@@ -160,6 +167,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) {
} while (readPool.num < readPool.min); } while (readPool.num < readPool.min);
} }
pthread_mutex_unlock(&readPool.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue);
return queue; return queue;
......
...@@ -47,6 +47,7 @@ typedef struct { ...@@ -47,6 +47,7 @@ typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
pthread_mutex_t mutex;
} SWriteWorkerPool; } SWriteWorkerPool;
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessWriteQueue(void *param);
...@@ -58,25 +59,26 @@ int32_t dnodeInitVnodeWrite() { ...@@ -58,25 +59,26 @@ int32_t dnodeInitVnodeWrite() {
wWorkerPool.max = tsNumOfCores; wWorkerPool.max = tsNumOfCores;
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
if (wWorkerPool.writeWorker == NULL) return -1; if (wWorkerPool.writeWorker == NULL) return -1;
pthread_mutex_init(&wWorkerPool.mutex, NULL);
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
wWorkerPool.writeWorker[i].workerId = i; wWorkerPool.writeWorker[i].workerId = i;
} }
dInfo("dnode write is opened"); dInfo("dnode write is opened, max worker %d", wWorkerPool.max);
return 0; return 0;
} }
void dnodeCleanupVnodeWrite() { void dnodeCleanupVnodeWrite() {
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < wWorkerPool.max; ++i) { for (int32_t i = 0; i < wWorkerPool.max; ++i) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + i; SWriteWorker *pWorker = wWorkerPool.writeWorker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
...@@ -84,6 +86,7 @@ void dnodeCleanupVnodeWrite() { ...@@ -84,6 +86,7 @@ void dnodeCleanupVnodeWrite() {
} }
} }
pthread_mutex_destroy(&wWorkerPool.mutex);
free(wWorkerPool.writeWorker); free(wWorkerPool.writeWorker);
dInfo("dnode write is closed"); dInfo("dnode write is closed");
} }
...@@ -124,14 +127,19 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -124,14 +127,19 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
} }
void *dnodeAllocateVnodeWqueue(void *pVnode) { void *dnodeAllocateVnodeWqueue(void *pVnode) {
pthread_mutex_lock(&wWorkerPool.mutex);
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
void *queue = taosOpenQueue(); void *queue = taosOpenQueue();
if (queue == NULL) return NULL; if (queue == NULL) {
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL;
}
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL; return NULL;
} }
...@@ -140,6 +148,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -140,6 +148,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
if (pWorker->qall == NULL) { if (pWorker->qall == NULL) {
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
taosCloseQueue(queue); taosCloseQueue(queue);
pthread_mutex_unlock(&wWorkerPool.mutex);
return NULL; return NULL;
} }
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -163,6 +172,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { ...@@ -163,6 +172,7 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) {
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
pthread_mutex_unlock(&wWorkerPool.mutex);
dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, write queue:%p is allocated", pVnode, queue);
return queue; return queue;
...@@ -201,6 +211,8 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -201,6 +211,8 @@ static void *dnodeProcessWriteQueue(void *param) {
int type; int type;
void *pVnode, *item; void *pVnode, *item;
dDebug("write worker:%d is running", pWorker->workerId);
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
......
...@@ -153,6 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, 0, 0x0369, "mnode tag ...@@ -153,6 +153,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, 0, 0x0369, "mnode tag
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, 0, 0x036A, "mnode tag not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, 0, 0x036A, "mnode tag not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, 0, 0x036B, "mnode field already exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, 0, 0x036B, "mnode field already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, 0, 0x036C, "mnode field not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, 0, 0x036C, "mnode field not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STABLE_NAME, 0, 0x036D, "mnode invalid stable name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, 0, 0x0380, "mnode db not selected") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_SELECTED, 0, 0x0380, "mnode db not selected")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, 0, 0x0381, "mnode database aleady exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, 0, 0x0381, "mnode database aleady exist")
......
...@@ -192,7 +192,8 @@ typedef struct SSubmitBlk { ...@@ -192,7 +192,8 @@ typedef struct SSubmitBlk {
int32_t tid; // table id int32_t tid; // table id
int32_t padding; // TODO just for padding here int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block int16_t numOfRows; // total number of rows in current submit block
char data[]; char data[];
} SSubmitBlk; } SSubmitBlk;
...@@ -618,7 +619,7 @@ typedef struct { ...@@ -618,7 +619,7 @@ typedef struct {
} SMDVnodeDesc; } SMDVnodeDesc;
typedef struct { typedef struct {
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
SMDVnodeCfg cfg; SMDVnodeCfg cfg;
SMDVnodeDesc nodes[TSDB_MAX_REPLICA]; SMDVnodeDesc nodes[TSDB_MAX_REPLICA];
} SMDCreateVnodeMsg; } SMDCreateVnodeMsg;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "tarray.h" #include "tarray.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tname.h" #include "tname.h"
#include "hash.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -168,8 +169,9 @@ typedef struct SDataBlockInfo { ...@@ -168,8 +169,9 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo; } SDataBlockInfo;
typedef struct { typedef struct {
size_t numOfTables; size_t numOfTables;
SArray *pGroupList; SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo from STableId
} STableGroupInfo; } STableGroupInfo;
typedef struct SQueryRowCond { typedef struct SQueryRowCond {
......
...@@ -790,7 +790,7 @@ int isCommentLine(char *line) { ...@@ -790,7 +790,7 @@ int isCommentLine(char *line) {
void source_file(TAOS *con, char *fptr) { void source_file(TAOS *con, char *fptr) {
wordexp_t full_path; wordexp_t full_path;
int read_len = 0; int read_len = 0;
char * cmd = calloc(1, MAX_COMMAND_SIZE); char * cmd = calloc(1, tsMaxSQLStringLen+1);
size_t cmd_len = 0; size_t cmd_len = 0;
char * line = NULL; char * line = NULL;
size_t line_len = 0; size_t line_len = 0;
...@@ -822,7 +822,7 @@ void source_file(TAOS *con, char *fptr) { ...@@ -822,7 +822,7 @@ void source_file(TAOS *con, char *fptr) {
} }
while ((read_len = getline(&line, &line_len, f)) != -1) { while ((read_len = getline(&line, &line_len, f)) != -1) {
if (read_len >= MAX_COMMAND_SIZE) continue; if (read_len >= tsMaxSQLStringLen) continue;
line[--read_len] = '\0'; line[--read_len] = '\0';
if (read_len == 0 || isCommentLine(line)) { // line starts with # if (read_len == 0 || isCommentLine(line)) { // line starts with #
...@@ -839,7 +839,7 @@ void source_file(TAOS *con, char *fptr) { ...@@ -839,7 +839,7 @@ void source_file(TAOS *con, char *fptr) {
memcpy(cmd + cmd_len, line, read_len); memcpy(cmd + cmd_len, line, read_len);
printf("%s%s\n", PROMPT_HEADER, cmd); printf("%s%s\n", PROMPT_HEADER, cmd);
shellRunCommand(con, cmd); shellRunCommand(con, cmd);
memset(cmd, 0, MAX_COMMAND_SIZE); memset(cmd, 0, tsMaxSQLStringLen);
cmd_len = 0; cmd_len = 0;
} }
......
...@@ -132,7 +132,7 @@ typedef struct SVgObj { ...@@ -132,7 +132,7 @@ typedef struct SVgObj {
int64_t createdTime; int64_t createdTime;
int32_t lbDnodeId; int32_t lbDnodeId;
int32_t lbTime; int32_t lbTime;
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int8_t inUse; int8_t inUse;
int8_t accessState; int8_t accessState;
int8_t reserved0[5]; int8_t reserved0[5];
......
...@@ -287,7 +287,7 @@ void sdbUpdateSync() { ...@@ -287,7 +287,7 @@ void sdbUpdateSync() {
SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC; syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC;
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeEp); tstrncpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN);
index++; index++;
} }
...@@ -367,6 +367,7 @@ void sdbCleanUp() { ...@@ -367,6 +367,7 @@ void sdbCleanUp() {
tsSdbObj.status = SDB_STATUS_CLOSING; tsSdbObj.status = SDB_STATUS_CLOSING;
sdbCleanupWriteWorker(); sdbCleanupWriteWorker();
sdbDebug("sdb will be closed, version:%" PRId64, tsSdbObj.version);
if (tsSdbObj.sync) { if (tsSdbObj.sync) {
syncStop(tsSdbObj.sync); syncStop(tsSdbObj.sync);
...@@ -976,11 +977,11 @@ static void *sdbWorkerFp(void *param) { ...@@ -976,11 +977,11 @@ static void *sdbWorkerFp(void *param) {
tstrerror(pOper->retCode)); tstrerror(pOper->retCode));
} }
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
if (pOper != NULL) { if (pOper != NULL) {
sdbDecRef(pOper->table, pOper->pObj); sdbDecRef(pOper->table, pOper->pObj);
} }
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
} }
taosFreeQitem(item); taosFreeQitem(item);
} }
......
...@@ -281,6 +281,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { ...@@ -281,6 +281,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; SCMConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
SCMConnectRsp *pConnectRsp = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
...@@ -309,7 +310,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -309,7 +310,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
} }
SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
if (pConnectRsp == NULL) { if (pConnectRsp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto connect_over; goto connect_over;
...@@ -332,7 +333,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -332,7 +333,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
connect_over: connect_over:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pConnectRsp); if (pConnectRsp) rpcFreeCont(pConnectRsp);
mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
} else { } else {
mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
......
...@@ -382,11 +382,13 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt ...@@ -382,11 +382,13 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
pStable->numOfTables++; pStable->numOfTables++;
if (pStable->vgHash == NULL) { if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(100000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
} }
if (pStable->vgHash != NULL) { if (pStable->vgHash != NULL) {
taosHashPut(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId)); if (taosHashGet(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId)) == NULL) {
taosHashPut(pStable->vgHash, &pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
}
} }
} }
...@@ -457,10 +459,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) { ...@@ -457,10 +459,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbOper *pOper) {
free(pNew); free(pNew);
free(oldTableId); free(oldTableId);
free(oldSchema); free(oldSchema);
mnodeDecTableRef(pTable);
} }
mnodeDecTableRef(pTable);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1254,13 +1255,13 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1254,13 +1255,13 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
char * pWrite; char * pWrite;
int32_t cols = 0; int32_t cols = 0;
SSuperTableObj *pTable = NULL; SSuperTableObj *pTable = NULL;
char prefix[20] = {0}; char prefix[64] = {0};
int32_t prefixLen; int32_t prefixLen;
SDbObj *pDb = mnodeGetDb(pShow->db); SDbObj *pDb = mnodeGetDb(pShow->db);
if (pDb == NULL) return 0; if (pDb == NULL) return 0;
strcpy(prefix, pDb->name); tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
prefixLen = strlen(prefix); prefixLen = strlen(prefix);
...@@ -1558,10 +1559,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO ...@@ -1558,10 +1559,10 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
if (pTable != NULL) { assert(pTable);
mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); mDebug("app:%p:%p, table:%s, create table in id:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pMsg,
} pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
...@@ -1965,9 +1966,15 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) { ...@@ -1965,9 +1966,15 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; SCMTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STagData *pTag = (STagData *)pInfo->tags; STagData *pTags = (STagData *)pInfo->tags;
int32_t tagLen = htonl(pTags->dataLen);
if (pTags->name[0] == 0) {
mError("app:%p:%p, table:%s, failed to create table on demand for stable is empty, tagLen:%d", pMsg->rpcMsg.ahandle,
pMsg, pInfo->tableId, tagLen);
return TSDB_CODE_MND_INVALID_STABLE_NAME;
}
int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + htonl(pTag->dataLen); int32_t contLen = sizeof(SCMCreateTableMsg) + offsetof(STagData, data) + tagLen;
SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen); SCMCreateTableMsg *pCreateMsg = rpcMallocCont(contLen);
if (pCreateMsg == NULL) { if (pCreateMsg == NULL) {
mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle, mError("app:%p:%p, table:%s, failed to create table while get meta info, no enough memory", pMsg->rpcMsg.ahandle,
...@@ -1982,9 +1989,9 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) { ...@@ -1982,9 +1989,9 @@ static int32_t mnodeAutoCreateChildTable(SMnodeMsg *pMsg) {
pCreateMsg->getMeta = 1; pCreateMsg->getMeta = 1;
pCreateMsg->contLen = htonl(contLen); pCreateMsg->contLen = htonl(contLen);
memcpy(pCreateMsg->schema, pInfo->tags, contLen - sizeof(SCMCreateTableMsg)); memcpy(pCreateMsg->schema, pTags, contLen - sizeof(SCMCreateTableMsg));
mDebug("app:%p:%p, table:%s, start to create on demand, stable:%s", pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId, mDebug("app:%p:%p, table:%s, start to create on demand, tagLen:%d stable:%s",
((STagData *)(pCreateMsg->schema))->name); pMsg->rpcMsg.ahandle, pMsg, pInfo->tableId, tagLen, pTags->name);
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE; pMsg->rpcMsg.msgType = TSDB_MSG_TYPE_CM_CREATE_TABLE;
...@@ -2370,10 +2377,21 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2370,10 +2377,21 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0}; char prefix[64] = {0};
strcpy(prefix, pDb->name); tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix); int32_t prefixLen = strlen(prefix);
char* pattern = NULL;
if (pShow->payloadLen > 0) {
pattern = (char*)malloc(pShow->payloadLen + 1);
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return 0;
}
memcpy(pattern, pShow->payload, pShow->payloadLen);
pattern[pShow->payloadLen] = 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -2389,7 +2407,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2389,7 +2407,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
// pattern compare for table name // pattern compare for table name
mnodeExtractTableName(pTable->info.tableId, tableName); mnodeExtractTableName(pTable->info.tableId, tableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pattern != NULL && patternMatch(pattern, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -2433,6 +2451,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -2433,6 +2451,7 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); mnodeVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern);
return numOfRows; return numOfRows;
} }
...@@ -2560,7 +2579,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -2560,7 +2579,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char prefix[64] = {0}; char prefix[64] = {0};
strcpy(prefix, pDb->name); tstrncpy(prefix, pDb->name, 64);
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = strlen(prefix); int32_t prefixLen = strlen(prefix);
......
...@@ -358,7 +358,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) { ...@@ -358,7 +358,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
tstrncpy(pVgroup->dbName, pDb->name, TSDB_DB_NAME_LEN); tstrncpy(pVgroup->dbName, pDb->name, TSDB_ACCT_LEN + TSDB_DB_NAME_LEN);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs(); pVgroup->createdTime = taosGetTimestampMs();
pVgroup->accessState = TSDB_VN_ALL_ACCCESS; pVgroup->accessState = TSDB_VN_ALL_ACCCESS;
......
...@@ -326,12 +326,12 @@ bool taosGetDisk() { ...@@ -326,12 +326,12 @@ bool taosGetDisk() {
if (statvfs("/tmp", &info)) { if (statvfs("/tmp", &info)) {
//tsTotalTmpDirGB = 0; //tsTotalTmpDirGB = 0;
//tsAvailTmpDirGB = 0; //tsAvailTmpDirectorySpace = 0;
uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno)); uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno));
return false; return false;
} else { } else {
tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit); tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit);
tsAvailTmpDirGB = (float)((double)info.f_bavail * (double)info.f_frsize / unit); tsAvailTmpDirectorySpace = (float)((double)info.f_bavail * (double)info.f_frsize / unit);
} }
return true; return true;
......
...@@ -359,6 +359,8 @@ void httpExecCmd(HttpContext *pContext) { ...@@ -359,6 +359,8 @@ void httpExecCmd(HttpContext *pContext) {
void httpProcessRequestCb(void *param, TAOS_RES *result, int code) { void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = param; HttpContext *pContext = param;
taos_free_result(result);
if (pContext == NULL) return; if (pContext == NULL) return;
if (code < 0) { if (code < 0) {
......
...@@ -161,12 +161,12 @@ typedef struct SQuery { ...@@ -161,12 +161,12 @@ typedef struct SQuery {
} SQuery; } SQuery;
typedef struct SQueryRuntimeEnv { typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery; SQuery* pQuery;
SQLFunctionCtx* pCtx; SQLFunctionCtx* pCtx;
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
uint16_t scanFlag; // denotes reversed scan of data or not uint16_t scanFlag; // denotes reversed scan of data or not
SFillInfo* pFillInfo; SFillInfo* pFillInfo;
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
STSBuf* pTSBuf; STSBuf* pTSBuf;
...@@ -176,7 +176,8 @@ typedef struct SQueryRuntimeEnv { ...@@ -176,7 +176,8 @@ typedef struct SQueryRuntimeEnv {
void* pQueryHandle; void* pQueryHandle;
void* pSecQueryHandle; // another thread for void* pSecQueryHandle; // another thread for
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
bool topBotQuery; // false; bool topBotQuery; // false
int32_t prevGroupId; // previous executed group id
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {
......
此差异已折叠。
...@@ -133,7 +133,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { ...@@ -133,7 +133,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
} }
pWindowResInfo->size = remain; pWindowResInfo->size = remain;
printf("---------------size:%ld\n", taosHashGetSize(pWindowResInfo->hashList));
for (int32_t k = 0; k < pWindowResInfo->size; ++k) { for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = &pWindowResInfo->pResult[k]; SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
......
...@@ -209,8 +209,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -209,8 +209,7 @@ int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
return 0; return 0;
} }
return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) return FILL_IS_ASC_FILL(pFillInfo) ? (pFillInfo->numOfRows - pFillInfo->rowIdx) : pFillInfo->rowIdx + 1;
: pFillInfo->rowIdx + 1;
} }
// todo: refactor // todo: refactor
......
...@@ -631,5 +631,5 @@ void exprSerializeTest2() { ...@@ -631,5 +631,5 @@ void exprSerializeTest2() {
} }
} // namespace } // namespace
TEST(testCase, astTest) { TEST(testCase, astTest) {
exprSerializeTest2(); // exprSerializeTest2();
} }
\ No newline at end of file
...@@ -665,6 +665,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -665,6 +665,12 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
return pConn; return pConn;
} }
// if code is not 0, it means it is simple reqhead, just ignore
if (pHead->code != 0) {
terrno = TSDB_CODE_RPC_ALREADY_PROCESSED;
return NULL;
}
int sid = taosAllocateId(pRpc->idPool); int sid = taosAllocateId(pRpc->idPool);
if (sid <= 0) { if (sid <= 0) {
tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions); tError("%s maximum number of sessions:%d is reached", pRpc->label, pRpc->sessions);
...@@ -1028,15 +1034,20 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1028,15 +1034,20 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
rpcMsg.ahandle = pConn->ahandle; rpcMsg.ahandle = pConn->ahandle;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.handle = pConn; if (rpcMsg.contLen > 0) {
rpcAddRef(pRpc); // add the refCount for requests rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app // start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS) if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app // notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL); (*(pRpc->cfp))(&rpcMsg, NULL);
} else {
tDebug("%s, message body is empty, ignore", pConn->info);
rpcFreeCont(rpcMsg.pCont);
}
} else { } else {
// it's a response // it's a response
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
......
...@@ -419,7 +419,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -419,7 +419,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
// tDebug("malloc mem: %p", buffer); tDebug("TCP malloc mem: %p", buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
......
...@@ -212,7 +212,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -212,7 +212,7 @@ static void *taosRecvUdpData(void *param) {
tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen); tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen);
continue; continue;
} else { } else {
// tTrace("malloc mem: %p", tmsg); tDebug("UDP malloc mem: %p", tmsg);
} }
tmsg += tsRpcOverhead; // overhead for SRpcReqContext tmsg += tsRpcOverhead; // overhead for SRpcReqContext
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "tutil.h" #include "tutil.h"
#include "ttime.h" #include "ttime.h"
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".h"}; const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile); static void tsdbDestroyFile(SFile *pFile);
......
...@@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { ...@@ -768,7 +768,8 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
SSubmitBlk *pBlock = pIter->pBlock; SSubmitBlk *pBlock = pIter->pBlock;
if (pBlock == NULL) return NULL; if (pBlock == NULL) return NULL;
pBlock->len = htonl(pBlock->len); pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows); pBlock->numOfRows = htons(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid); pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid); pBlock->tid = htonl(pBlock->tid);
...@@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { ...@@ -776,11 +777,11 @@ static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
pBlock->sversion = htonl(pBlock->sversion); pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding); pBlock->padding = htonl(pBlock->padding);
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL; pIter->pBlock = NULL;
} else { } else {
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk)); pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk));
} }
return pBlock; return pBlock;
...@@ -832,10 +833,10 @@ _err: ...@@ -832,10 +833,10 @@ _err:
} }
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->len <= 0) return -1; if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->len; pIter->totalLen = pBlock->dataLen;
pIter->len = 0; pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data); pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
return 0; return 0;
} }
......
...@@ -111,7 +111,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ...@@ -111,7 +111,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
} }
tsdbDebug("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key); TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), key);
return 0; return 0;
...@@ -443,12 +443,14 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { ...@@ -443,12 +443,14 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno)); tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err; goto _err;
} }
} else if (pAct->act == TSDB_DROP_META) { } else if (pAct->act == TSDB_DROP_META) {
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
tstrerror(terrno)); tstrerror(terrno));
tdKVStoreEndCommit(pMeta->pStore);
goto _err; goto _err;
} }
} else { } else {
......
...@@ -480,13 +480,11 @@ int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) { ...@@ -480,13 +480,11 @@ int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
bool changed = false; bool changed = false;
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
if (pTable->type == TSDB_SUPER_TABLE) { if ((pTable->type == TSDB_SUPER_TABLE) && (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema))) {
if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) { if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) { tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tstrerror(terrno));
tstrerror(terrno)); return -1;
return -1;
}
} }
changed = true; changed = true;
} }
...@@ -552,13 +550,13 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { ...@@ -552,13 +550,13 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
} }
void tsdbRefTable(STable *pTable) { void tsdbRefTable(STable *pTable) {
int16_t ref = T_REF_INC(pTable); int32_t ref = T_REF_INC(pTable);
UNUSED(ref); UNUSED(ref);
// tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); // tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
} }
void tsdbUnRefTable(STable *pTable) { void tsdbUnRefTable(STable *pTable) {
int16_t ref = T_REF_DEC(pTable); int32_t ref = T_REF_DEC(pTable);
tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
if (ref == 0) { if (ref == 0) {
...@@ -598,7 +596,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { ...@@ -598,7 +596,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
return -1; return -1;
} }
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
return 0; return 0;
} }
...@@ -799,7 +797,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { ...@@ -799,7 +797,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable)); pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable));
} }
tsdbDebug("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
TABLE_TID(pTable), TABLE_UID(pTable)); TABLE_TID(pTable), TABLE_UID(pTable));
return 0; return 0;
...@@ -1215,7 +1213,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) { ...@@ -1215,7 +1213,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
while (tSkipListIterNext(pIter)) { while (tSkipListIterNext(pIter)) {
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter)); STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE); ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable); pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable);
} }
tSkipListDestroyIter(pIter); tSkipListDestroyIter(pIter);
...@@ -1254,4 +1252,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { ...@@ -1254,4 +1252,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -121,18 +121,19 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { ...@@ -121,18 +121,19 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
errno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
// Create and open .l file if should // Create and open .l file if should
if (tsdbShouldCreateNewLast(pHelper)) { if (tsdbShouldCreateNewLast(pHelper)) {
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno)); TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
}
} }
} else { } else {
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err; if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
......
...@@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo; pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
...@@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK ...@@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
} }
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
// todo check open file failed
SFileGroup* fileGroup = pQueryHandle->pFileGroup; SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
//open file failed, return error code to client
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// load all the comp offset value for all tables in this file // load all the comp offset value for all tables in this file
*numOfBlocks = 0; *numOfBlocks = 0;
...@@ -538,17 +542,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -538,17 +542,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { #define GET_FILE_DATA_BLOCK_INFO(_checkInfo, _block) \
SDataBlockInfo info = { ((SDataBlockInfo){.window = {.skey = (_block)->keyFirst, .ekey = (_block)->keyLast}, \
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast}, .numOfCols = (_block)->numOfCols, \
.numOfCols = pBlock->numOfCols, .rows = (_block)->numOfRows, \
.rows = pBlock->numOfRows, .tid = (_checkInfo)->tableId.tid, \
.tid = pCheckInfo->tableId.tid, .uid = (_checkInfo)->tableId.uid})
.uid = pCheckInfo->tableId.uid,
};
return info;
}
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) { static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle); size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
...@@ -593,8 +592,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -593,8 +592,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
// TODO // TODO
pCheckInfo->pDataCols = pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
} }
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
...@@ -623,7 +621,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo ...@@ -623,7 +621,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo); SDataRow row = getSDataRowInTableMem(pCheckInfo);
...@@ -943,7 +941,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -943,7 +941,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
SArray* sa) { SArray* sa) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo blockInfo = getTrueDataBlockInfo(pCheckInfo, pBlock); SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
initTableMemIterator(pQueryHandle, pCheckInfo); initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
...@@ -1319,8 +1317,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1319,8 +1317,8 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables; sup.numOfTables = numOfQualTables;
SLoserTreeInfo* pTree = NULL;
SLoserTreeInfo* pTree = NULL;
uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
cleanBlockOrderSupporter(&sup, numOfTables); cleanBlockOrderSupporter(&sup, numOfTables);
...@@ -1359,16 +1357,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO ...@@ -1359,16 +1357,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
} }
// todo opt for only one table case // todo opt for only one table case
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0; pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
break; break;
} }
...@@ -1393,20 +1393,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { ...@@ -1393,20 +1393,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
// no data in file anymore // no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) { if (pQueryHandle->numOfBlocks <= 0) {
assert(pQueryHandle->pFileGroup == NULL); if (code == TSDB_CODE_SUCCESS) {
assert(pQueryHandle->pFileGroup == NULL);
}
cur->fid = -1; // denote that there are no data in file anymore cur->fid = -1; // denote that there are no data in file anymore
*exists = false;
return false; return code;
} }
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
...@@ -1419,7 +1424,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1419,7 +1424,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// check if current file block is all consumed // check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
...@@ -1430,7 +1435,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1430,7 +1435,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists // all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// next block of the same file // next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
...@@ -1440,11 +1445,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { ...@@ -1440,11 +1445,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
return pQueryHandle->realNumOfRows > 0; *exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS;
} }
} }
} }
...@@ -1576,8 +1585,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1576,8 +1585,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
if (getDataBlocksInFiles(pQueryHandle)) { bool exists = true;
return true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
return exists;
} }
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;
...@@ -1824,7 +1839,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1824,7 +1839,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
if (pHandle->cur.mixBlock) { if (pHandle->cur.mixBlock) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlockInfo->compBlock); SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows); assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method // data block has been loaded, todo extract method
......
...@@ -22,7 +22,7 @@ typedef void (*_ref_fn_t)(const void* pObj); ...@@ -22,7 +22,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
#define T_REF_DECLARE() \ #define T_REF_DECLARE() \
struct { \ struct { \
int16_t val; \ int32_t val; \
} _ref; } _ref;
#define T_REF_REGISTER_FUNC(s, e) \ #define T_REF_REGISTER_FUNC(s, e) \
...@@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj); ...@@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj);
_ref_fn_t end; \ _ref_fn_t end; \
} _ref_func = {.begin = (s), .end = (e)}; } _ref_func = {.begin = (s), .end = (e)};
#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)) #define T_REF_INC(x) (atomic_add_fetch_32(&((x)->_ref.val), 1))
#define T_REF_INC_WITH_CB(x, p) \ #define T_REF_INC_WITH_CB(x, p) \
do { \ do { \
...@@ -41,11 +41,11 @@ typedef void (*_ref_fn_t)(const void* pObj); ...@@ -41,11 +41,11 @@ typedef void (*_ref_fn_t)(const void* pObj);
} \ } \
} while (0) } while (0)
#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)) #define T_REF_DEC(x) (atomic_sub_fetch_32(&((x)->_ref.val), 1))
#define T_REF_DEC_WITH_CB(x, p) \ #define T_REF_DEC_WITH_CB(x, p) \
do { \ do { \
int32_t v = atomic_sub_fetch_16(&((x)->_ref.val), 1); \ int32_t v = atomic_sub_fetch_32(&((x)->_ref.val), 1); \
if (v == 0 && (p)->_ref_func.end != NULL) { \ if (v == 0 && (p)->_ref_func.end != NULL) { \
(p)->_ref_func.end((x)); \ (p)->_ref_func.end((x)); \
} \ } \
......
...@@ -415,7 +415,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -415,7 +415,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} }
*data = NULL; *data = NULL;
int16_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
uDebug("%p data released, refcnt:%d", pNode, ref); uDebug("%p data released, refcnt:%d", pNode, ref);
if (_remove) { if (_remove) {
......
...@@ -259,6 +259,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe ...@@ -259,6 +259,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
} }
taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
uDebug("put uid %" PRIu64 " into kvStore %s", uid, pStore->fname);
return 0; return 0;
} }
...@@ -292,6 +293,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) { ...@@ -292,6 +293,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid)); taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid));
uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname);
return 0; return 0;
} }
......
...@@ -68,10 +68,15 @@ void taosCloseQueue(taos_queue param) { ...@@ -68,10 +68,15 @@ void taosCloseQueue(taos_queue param) {
if (param == NULL) return; if (param == NULL) return;
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pTemp; STaosQnode *pTemp;
STaosQset *qset;
pthread_mutex_lock(&queue->mutex);
STaosQnode *pNode = queue->head; STaosQnode *pNode = queue->head;
queue->head = NULL; queue->head = NULL;
qset = queue->qset;
pthread_mutex_unlock(&queue->mutex);
if (queue->qset) taosRemoveFromQset(queue->qset, queue); if (queue->qset) taosRemoveFromQset(qset, queue);
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
...@@ -95,7 +100,7 @@ void *taosAllocateQitem(int size) { ...@@ -95,7 +100,7 @@ void *taosAllocateQitem(int size) {
void taosFreeQitem(void *param) { void taosFreeQitem(void *param) {
if (param == NULL) return; if (param == NULL) return;
uDebug("item:%p is freed", param); uTrace("item:%p is freed", param);
char *temp = (char *)param; char *temp = (char *)param;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
free(temp); free(temp);
...@@ -119,7 +124,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { ...@@ -119,7 +124,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uDebug("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems); uTrace("item:%p is put into queue:%p, type:%d items:%d", item, queue, type, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -201,7 +206,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { ...@@ -201,7 +206,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
*pitem = pNode->item; *pitem = pNode->item;
*type = pNode->type; *type = pNode->type;
num = 1; num = 1;
uDebug("item:%p is fetched, type:%d", *pitem, *type); uTrace("item:%p is fetched, type:%d", *pitem, *type);
} }
return num; return num;
...@@ -339,7 +344,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand ...@@ -339,7 +344,7 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
queue->numOfItems--; queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uDebug("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems); uTrace("item:%p is read out from queue:%p, type:%d items:%d", *pitem, queue, *type, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
......
...@@ -60,7 +60,7 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0, ...@@ -60,7 +60,7 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec); // year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
int64_t res; int64_t res;
res = 367*((int64_t)mon)/12; res = 367*((int64_t)mon)/12;
res += year/4 - year/100 + year/400 + day + year*365 - 719499; res += year/4 - year/100 + year/400 + day + ((int64_t)year)*365 - 719499;
res = res*24; res = res*24;
res = ((res + hour) * 60 + min) * 60 + sec; res = ((res + hour) * 60 + min) * 60 + sec;
......
...@@ -123,9 +123,8 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -123,9 +123,8 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg); if (tsdbCreateRepo(tsdbDir, &tsdbCfg) < 0) {
if (code != TSDB_CODE_SUCCESS) { vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
return TSDB_CODE_VND_INIT_FAILED; return TSDB_CODE_VND_INIT_FAILED;
} }
...@@ -601,10 +600,11 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -601,10 +600,11 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
content = calloc(1, maxLen + 1); content = calloc(1, maxLen + 1);
if (content == NULL) goto PARSE_OVER; if (content == NULL) goto PARSE_OVER;
int len = fread(content, 1, maxLen, fp); int len = fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
free(content); free(content);
fclose(fp);
return errno; return errno;
} }
...@@ -649,7 +649,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -649,7 +649,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
pVnode->tsdbCfg.maxTables = maxTables->valueint; pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile"); cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
if (!daysPerFile || daysPerFile->type != cJSON_Number) { if (!daysPerFile || daysPerFile->type != cJSON_Number) {
vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId); vError("vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode->vgId);
goto PARSE_OVER; goto PARSE_OVER;
......
...@@ -137,7 +137,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -137,7 +137,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
vDebug("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query
} }
......
...@@ -94,7 +94,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -94,7 +94,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item // save insert result into item
vDebug("vgId:%d, submit msg is processed", pVnode->vgId); vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
pRet->len = sizeof(SShellSubmitRspMsg); pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len); pRet->rsp = rpcMallocCont(pRet->len);
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
4. pip install ../src/connector/python/linux/python2 ; pip3 install 4. pip install ../src/connector/python/linux/python2 ; pip3 install
../src/connector/python/linux/python3 ../src/connector/python/linux/python3
5. pip install numpy; pip3 install numpy 5. pip install numpy; pip3 install numpy (numpy is required only if you need to run querySort.py)
> Note: Both Python2 and Python3 are currently supported by the Python test > Note: Both Python2 and Python3 are currently supported by the Python test
> framework. Since Python2 is no longer officially supported by Python Software > framework. Since Python2 is no longer officially supported by Python Software
......
...@@ -77,11 +77,7 @@ class TDTestCase: ...@@ -77,11 +77,7 @@ class TDTestCase:
# join queries # join queries
tdSql.query( tdSql.query(
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") "select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
tdSql.checkRows(6) tdSql.checkRows(6)
tdSql.query(
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id order by ts desc")
tdSql.checkColumnSorted(0, "desc")
tdSql.error( tdSql.error(
"select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") "select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
......
...@@ -16,6 +16,7 @@ import taos ...@@ -16,6 +16,7 @@ import taos
from util.log import * from util.log import *
from util.cases import * from util.cases import *
from util.sql import * from util.sql import *
import numpy as np
class TDTestCase: class TDTestCase:
...@@ -26,6 +27,46 @@ class TDTestCase: ...@@ -26,6 +27,46 @@ class TDTestCase:
self.rowNum = 10 self.rowNum = 10
self.ts = 1537146000000 self.ts = 1537146000000
def checkColumnSorted(self, col, order):
frame = inspect.stack()[1]
callerModule = inspect.getmodule(frame[0])
callerFilename = callerModule.__file__
if col < 0:
tdLog.exit(
"%s failed: sql:%s, col:%d is smaller than zero" %
(callerFilename, tdSql.sql, col))
if col > tdSql.queryCols:
tdLog.exit(
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
(callerFilename, tdSql.sql, col, tdSql.queryCols))
matrix = np.array(tdSql.queryResult)
list = matrix[:, 0]
if order == "" or order.upper() == "ASC":
if all(sorted(list) == list):
tdLog.info(
"sql:%s, column :%d is sorted in accending order as expected" %
(tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
(callerFilename, tdSql.sql, col))
elif order.upper() == "DESC":
if all(sorted(list, reverse=True) == list):
tdLog.info(
"sql:%s, column :%d is sorted in decending order as expected" %
(tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in decending order" %
(callerFilename, tdSql.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, the order provided for col:%d is not correct" %
(callerFilename, tdSql.sql, col))
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -49,11 +90,11 @@ class TDTestCase: ...@@ -49,11 +90,11 @@ class TDTestCase:
print("======= step 2: verify order for each column =========") print("======= step 2: verify order for each column =========")
# sort for timestamp in asc order # sort for timestamp in asc order
tdSql.query("select * from st order by ts asc") tdSql.query("select * from st order by ts asc")
tdSql.checkColumnSorted(0, "asc") self.checkColumnSorted(0, "asc")
# sort for timestamp in desc order # sort for timestamp in desc order
tdSql.query("select * from st order by ts desc") tdSql.query("select * from st order by ts desc")
tdSql.checkColumnSorted(0, "desc") self.checkColumnSorted(0, "desc")
for i in range(1, 10): for i in range(1, 10):
tdSql.error("select * from st order by tbcol%d" % i) tdSql.error("select * from st order by tbcol%d" % i)
...@@ -63,17 +104,17 @@ class TDTestCase: ...@@ -63,17 +104,17 @@ class TDTestCase:
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "") self.checkColumnSorted(1, "")
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "asc") self.checkColumnSorted(1, "asc")
tdSql.query( tdSql.query(
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" % "select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" %
(i, i)) (i, i))
tdSql.checkColumnSorted(1, "desc") self.checkColumnSorted(1, "desc")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -43,7 +43,7 @@ python3 ./test.py -f tag_lite/commit.py ...@@ -43,7 +43,7 @@ python3 ./test.py -f tag_lite/commit.py
python3 ./test.py -f tag_lite/create.py python3 ./test.py -f tag_lite/create.py
python3 ./test.py -f tag_lite/datatype.py python3 ./test.py -f tag_lite/datatype.py
python3 ./test.py -f tag_lite/datatype-without-alter.py python3 ./test.py -f tag_lite/datatype-without-alter.py
# python3 ./test.py -f tag_lite/delete.py python3 ./test.py -f tag_lite/delete.py
python3 ./test.py -f tag_lite/double.py python3 ./test.py -f tag_lite/double.py
python3 ./test.py -f tag_lite/float.py python3 ./test.py -f tag_lite/float.py
python3 ./test.py -f tag_lite/int_binary.py python3 ./test.py -f tag_lite/int_binary.py
...@@ -134,9 +134,10 @@ python3 ./test.py -f table/del_stable.py ...@@ -134,9 +134,10 @@ python3 ./test.py -f table/del_stable.py
python3 ./test.py -f query/filter.py python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterAllIntTypes.py python3 ./test.py -f query/filterAllIntTypes.py
python3 ./test.py -f query/filterFloatAndDouble.py python3 ./test.py -f query/filterFloatAndDouble.py
python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/queryError.py
python3 ./test.py -f query/querySort.py python3 ./test.py -f query/querySort.py
#stream #stream
python3 ./test.py -f stream/stream1.py python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py python3 ./test.py -f stream/stream2.py
......
...@@ -8,24 +8,8 @@ python3 ./test.py $1 -s && sleep 1 ...@@ -8,24 +8,8 @@ python3 ./test.py $1 -s && sleep 1
# insert # insert
python3 ./test.py $1 -f insert/basic.py python3 ./test.py $1 -f insert/basic.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/int.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/float.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bigint.py python3 ./test.py $1 -f insert/bigint.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/bool.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/double.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/smallint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/tinyint.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/binary.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/date.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/nchar.py python3 ./test.py $1 -f insert/nchar.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f insert/multi.py python3 ./test.py $1 -f insert/multi.py
...@@ -42,18 +26,6 @@ python3 ./test.py $1 -s && sleep 1 ...@@ -42,18 +26,6 @@ python3 ./test.py $1 -s && sleep 1
# import # import
python3 ./test.py $1 -f import_merge/importDataLastSub.py python3 ./test.py $1 -f import_merge/importDataLastSub.py
python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importHead.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importLastT.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importSpan.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importTail.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importTRestart.py
python3 ./test.py $1 -s && sleep 1
python3 ./test.py $1 -f import_merge/importInsertThenImport.py
python3 ./test.py $1 -s && sleep 1
#tag #tag
python3 ./test.py $1 -f tag_lite/filter.py python3 ./test.py $1 -f tag_lite/filter.py
......
...@@ -17,7 +17,6 @@ import time ...@@ -17,7 +17,6 @@ import time
import datetime import datetime
import inspect import inspect
from util.log import * from util.log import *
import numpy as np
class TDSql: class TDSql:
...@@ -199,47 +198,7 @@ class TDSql: ...@@ -199,47 +198,7 @@ class TDSql:
"%s failed: sql:%s, affectedRows:%d != expect:%d" % "%s failed: sql:%s, affectedRows:%d != expect:%d" %
(callerFilename, self.sql, self.affectedRows, expectAffectedRows)) (callerFilename, self.sql, self.affectedRows, expectAffectedRows))
tdLog.info("sql:%s, affectedRows:%d == expect:%d" % tdLog.info("sql:%s, affectedRows:%d == expect:%d" %
(self.sql, self.affectedRows, expectAffectedRows)) (self.sql, self.affectedRows, expectAffectedRows))
def checkColumnSorted(self, col, order):
frame = inspect.stack()[1]
callerModule = inspect.getmodule(frame[0])
callerFilename = callerModule.__file__
if col < 0:
tdLog.exit(
"%s failed: sql:%s, col:%d is smaller than zero" %
(callerFilename, self.sql, col))
if col > self.queryCols:
tdLog.exit(
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
(callerFilename, self.sql, col, self.queryCols))
matrix = np.array(self.queryResult)
list = matrix[:, 0]
if order == "" or order.upper() == "ASC":
if all(sorted(list) == list):
tdLog.info(
"sql:%s, column :%d is sorted in accending order as expected" %
(self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
(callerFilename, self.sql, col))
elif order.upper() == "DESC":
if all(sorted(list, reverse=True) == list):
tdLog.info(
"sql:%s, column :%d is sorted in decending order as expected" %
(self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, col:%d is not sorted in decending order" %
(callerFilename, self.sql, col))
else:
tdLog.exit(
"%s failed: sql:%s, the order provided for col:%d is not correct" %
(callerFilename, self.sql, col))
tdSql = TDSql() tdSql = TDSql()
run general/cache/new_metrics.sim run general/cache/new_metrics.sim
run general/column/commit.sim
run general/compress/compress.sim
run general/compute/interval.sim run general/compute/interval.sim
run general/db/basic4.sim run general/http/restful_full.sim
run general/field/binary.sim
run general/http/restful_insert.sim
run general/import/commit.sim run general/import/commit.sim
run general/import/replica1.sim run general/import/replica1.sim
run general/parser/auto_create_tb_drop_tb.sim run general/parser/auto_create_tb_drop_tb.sim
run general/parser/binary_escapeCharacter.sim run general/parser/binary_escapeCharacter.sim
run general/parser/select_from_cache_disk.sim run general/parser/select_from_cache_disk.sim
run general/parser/null_char.sim
run general/parser/alter.sim
run general/stable/vnode3.sim run general/stable/vnode3.sim
run general/table/autocreate.sim
run general/table/fill.sim
run general/table/vgroup.sim
run general/tag/filter.sim run general/tag/filter.sim
run general/table/vgroup.sim
run general/user/authority.sim run general/user/authority.sim
run general/user/pass_alter.sim
run general/vector/metrics_mix.sim run general/vector/metrics_mix.sim
run general/vector/table_field.sim run general/vector/table_field.sim
run general/user/authority.sim
run general/tag/set.sim
run general/table/delete_writing.sim
run general/stable/disk.sim
...@@ -232,24 +232,3 @@ cd ../../../debug; make ...@@ -232,24 +232,3 @@ cd ../../../debug; make
./test.sh -f general/vector/table_mix.sim ./test.sh -f general/vector/table_mix.sim
./test.sh -f general/vector/table_query.sim ./test.sh -f general/vector/table_query.sim
./test.sh -f general/vector/table_time.sim ./test.sh -f general/vector/table_time.sim
./test.sh -f unique/account/account_create.sim
./test.sh -f unique/account/account_delete.sim
./test.sh -f unique/account/account_len.sim
./test.sh -f unique/account/authority.sim
./test.sh -f unique/account/basic.sim
./test.sh -f unique/account/paras.sim
./test.sh -f unique/account/pass_alter.sim
./test.sh -f unique/account/pass_len.sim
./test.sh -f unique/account/usage.sim
./test.sh -f unique/account/user_create.sim
./test.sh -f unique/account/user_len.sim
./test.sh -f unique/cluster/balance1.sim
./test.sh -f unique/cluster/balance2.sim
./test.sh -f unique/dnode/balance1.sim
./test.sh -f unique/dnode/balance2.sim
./test.sh -f unique/stable/dnode3.sim
./test.sh -f unique/mnode/mgmt22.sim
./test.sh -f unique/mnode/mgmt33.sim
./test.sh -f unique/vnode/many.sim
\ No newline at end of file
...@@ -102,9 +102,10 @@ cd ../../../debug; make ...@@ -102,9 +102,10 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim ./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_createErrData_online.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim ./test.sh -f unique/arbitrator/dn3_mn1_vnode_noCorruptFile_offline.sim
./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim ./test.sh -f unique/arbitrator/dn3_mn1_vnode_delDir.sim
./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim ./test.sh -f unique/arbitrator/dn3_mn1_r2_vnode_delDir.sim
......
##unsupport run general/alter/cached_schema_after_alter.sim ##unsupport run general/alter/cached_schema_after_alter.sim
unsupport run general/alter/count.sim run general/alter/count.sim
unsupport run general/alter/import.sim run general/alter/import.sim
##unsupport run general/alter/insert1.sim ##unsupport run general/alter/insert1.sim
unsupport run general/alter/insert2.sim run general/alter/insert2.sim
unsupport run general/alter/metrics.sim run general/alter/metrics.sim
unsupport run general/alter/table.sim run general/alter/table.sim
run general/cache/new_metrics.sim run general/cache/new_metrics.sim
run general/cache/restart_metrics.sim run general/cache/restart_metrics.sim
run general/cache/restart_table.sim run general/cache/restart_table.sim
...@@ -86,14 +86,14 @@ run general/insert/query_block2_file.sim ...@@ -86,14 +86,14 @@ run general/insert/query_block2_file.sim
run general/insert/query_file_memory.sim run general/insert/query_file_memory.sim
run general/insert/query_multi_file.sim run general/insert/query_multi_file.sim
run general/insert/tcp.sim run general/insert/tcp.sim
##unsupport run general/parser/alter.sim run general/parser/alter.sim
run general/parser/alter1.sim run general/parser/alter1.sim
run general/parser/alter_stable.sim run general/parser/alter_stable.sim
run general/parser/auto_create_tb.sim run general/parser/auto_create_tb.sim
run general/parser/auto_create_tb_drop_tb.sim run general/parser/auto_create_tb_drop_tb.sim
run general/parser/col_arithmetic_operation.sim run general/parser/col_arithmetic_operation.sim
run general/parser/columnValue.sim run general/parser/columnValue.sim
#run general/parser/commit.sim run general/parser/commit.sim
run general/parser/create_db.sim run general/parser/create_db.sim
run general/parser/create_mt.sim run general/parser/create_mt.sim
run general/parser/create_tb.sim run general/parser/create_tb.sim
...@@ -106,7 +106,7 @@ run general/parser/first_last.sim ...@@ -106,7 +106,7 @@ run general/parser/first_last.sim
##unsupport run general/parser/import_file.sim ##unsupport run general/parser/import_file.sim
run general/parser/lastrow.sim run general/parser/lastrow.sim
run general/parser/nchar.sim run general/parser/nchar.sim
##unsupport run general/parser/null_char.sim run general/parser/null_char.sim
run general/parser/single_row_in_tb.sim run general/parser/single_row_in_tb.sim
run general/parser/select_from_cache_disk.sim run general/parser/select_from_cache_disk.sim
run general/parser/limit.sim run general/parser/limit.sim
...@@ -132,7 +132,7 @@ run general/parser/groupby.sim ...@@ -132,7 +132,7 @@ run general/parser/groupby.sim
run general/parser/bug.sim run general/parser/bug.sim
run general/parser/tags_dynamically_specifiy.sim run general/parser/tags_dynamically_specifiy.sim
run general/parser/set_tag_vals.sim run general/parser/set_tag_vals.sim
##unsupport run general/parser/repeatAlter.sim run general/parser/repeatAlter.sim
##unsupport run general/parser/slimit_alter_tags.sim ##unsupport run general/parser/slimit_alter_tags.sim
##unsupport run general/parser/stream_on_sys.sim ##unsupport run general/parser/stream_on_sys.sim
run general/parser/stream.sim run general/parser/stream.sim
...@@ -142,6 +142,8 @@ run general/stable/dnode3.sim ...@@ -142,6 +142,8 @@ run general/stable/dnode3.sim
run general/stable/metrics.sim run general/stable/metrics.sim
run general/stable/values.sim run general/stable/values.sim
run general/stable/vnode3.sim run general/stable/vnode3.sim
run general/stable/refcount.sim
run general/stable/show.sim
run general/table/autocreate.sim run general/table/autocreate.sim
run general/table/basic1.sim run general/table/basic1.sim
run general/table/basic2.sim run general/table/basic2.sim
......
...@@ -112,7 +112,7 @@ echo "numOfLogLines 100000000" >> $TAOS_CFG ...@@ -112,7 +112,7 @@ echo "numOfLogLines 100000000" >> $TAOS_CFG
echo "dDebugFlag 135" >> $TAOS_CFG echo "dDebugFlag 135" >> $TAOS_CFG
echo "mDebugFlag 135" >> $TAOS_CFG echo "mDebugFlag 135" >> $TAOS_CFG
echo "sdbDebugFlag 135" >> $TAOS_CFG echo "sdbDebugFlag 135" >> $TAOS_CFG
echo "rpcDebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 135" >> $TAOS_CFG
echo "tmrDebugFlag 131" >> $TAOS_CFG echo "tmrDebugFlag 131" >> $TAOS_CFG
echo "cDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG
echo "httpDebugFlag 135" >> $TAOS_CFG echo "httpDebugFlag 135" >> $TAOS_CFG
......
...@@ -64,7 +64,7 @@ print ======== step7 ...@@ -64,7 +64,7 @@ print ======== step7
$lastRows = $data00 $lastRows = $data00
print ======== loop Times $x print ======== loop Times $x
if $x < 2 then if $x < 5 then
$x = $x + 1 $x = $x + 1
goto loop goto loop
endi endi
......
...@@ -75,7 +75,7 @@ print ======== step8 ...@@ -75,7 +75,7 @@ print ======== step8
$lastRows = $data00 $lastRows = $data00
print ======== loop Times $x print ======== loop Times $x
if $x < 2 then if $x < 5 then
$x = $x + 1 $x = $x + 1
goto loop goto loop
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册