提交 74b54610 编写于 作者: H hjxilinx

[td-32] fix bugs in insertion and retrieve data

上级 19025969
...@@ -18,9 +18,10 @@ ...@@ -18,9 +18,10 @@
#define _XOPEN_SOURCE #define _XOPEN_SOURCE
#include "hash.h"
#include "os.h" #include "os.h"
#include "tscSecondaryMerge.h"
#include "hash.h"
//#include "tscSecondaryMerge.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
...@@ -32,6 +33,8 @@ ...@@ -32,6 +33,8 @@
#include "tstoken.h" #include "tstoken.h"
#include "ttime.h" #include "ttime.h"
#include "dataformat.h"
enum { enum {
TSDB_USE_SERVER_TS = 0, TSDB_USE_SERVER_TS = 0,
TSDB_USE_CLI_TS = 1, TSDB_USE_CLI_TS = 1,
...@@ -393,7 +396,6 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start ...@@ -393,7 +396,6 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error, int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
int16_t timePrec, int32_t *code, char *tmpTokenBuf) { int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0; int32_t index = 0;
// bool isPrevOptr; //fang, never used
SSQLToken sToken = {0}; SSQLToken sToken = {0};
char * payload = pDataBlocks->pData + pDataBlocks->size; char * payload = pDataBlocks->pData + pDataBlocks->size;
...@@ -604,8 +606,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 ...@@ -604,8 +606,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
pBlocks->sid = pTableMeta->sid; pBlocks->tid = pTableMeta->sid;
pBlocks->uid = pTableMeta->uid; pBlocks->uid = pTableMeta->uid;
pBlocks->sversion = pTableMeta->sversion; pBlocks->sversion = pTableMeta->sversion;
pBlocks->numOfRows += numOfRows; pBlocks->numOfRows += numOfRows;
...@@ -613,10 +615,10 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableM ...@@ -613,10 +615,10 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableM
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) { void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData; SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet. // size is less than the total size, since duplicated rows may be removed yet.
assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size); assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size);
// if use server time, this block must be ordered // if use server time, this block must be ordered
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) { if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
...@@ -624,7 +626,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { ...@@ -624,7 +626,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
} }
if (!dataBuf->ordered) { if (!dataBuf->ordered) {
char *pBlockData = pBlocks->payLoad; char *pBlockData = pBlocks->data;
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0; int32_t i = 0;
...@@ -650,7 +652,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { ...@@ -650,7 +652,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
dataBuf->ordered = true; dataBuf->ordered = true;
pBlocks->numOfRows = i + 1; pBlocks->numOfRows = i + 1;
dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows; dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows;
} }
} }
...@@ -663,7 +665,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -663,7 +665,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SShellSubmitBlock), tinfo.rowSize, pTableMetaInfo->name, sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name,
pTableMeta, &dataBuf); pTableMeta, &dataBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
...@@ -691,11 +693,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -691,11 +693,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
SParamInfo *param = dataBuf->params + i; SParamInfo *param = dataBuf->params + i;
if (param->idx == -1) { if (param->idx == -1) {
param->idx = pCmd->numOfParams++; param->idx = pCmd->numOfParams++;
param->offset -= sizeof(SShellSubmitBlock); param->offset -= sizeof(SSubmitBlk);
} }
} }
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
dataBuf->vgId = pTableMeta->vgId; dataBuf->vgId = pTableMeta->vgId;
...@@ -1141,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1141,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
STableDataBlocks *pDataBlock = NULL; STableDataBlocks *pDataBlock = NULL;
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SShellSubmitBlock), pTableMetaInfo->name, int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
pTableMeta, &pDataBlock); pTableMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto _error_clean; goto _error_clean;
...@@ -1353,7 +1355,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1353,7 +1355,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
...@@ -1394,7 +1396,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1394,7 +1396,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = tscCreateBlockArrayList();
STableDataBlocks *pTableDataBlock = NULL; STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock), int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk),
pTableMetaInfo->name, pTableMeta, &pTableDataBlock); pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return -1; return -1;
...@@ -1435,7 +1437,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1435,7 +1437,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
} }
pTableDataBlock = pCmd->pDataBlocks->pData[0]; pTableDataBlock = pCmd->pDataBlocks->pData[0];
pTableDataBlock->size = sizeof(SShellSubmitBlock); pTableDataBlock->size = sizeof(SSubmitBlk);
pTableDataBlock->rowSize = tinfo.rowSize; pTableDataBlock->rowSize = tinfo.rowSize;
numOfRows += pSql->res.numOfRows; numOfRows += pSql->res.numOfRows;
......
...@@ -325,12 +325,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { ...@@ -325,12 +325,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
uint32_t dataSize = totalDataSize / alloced; uint32_t dataSize = totalDataSize / alloced;
assert(dataSize * alloced == totalDataSize); assert(dataSize * alloced == totalDataSize);
if (alloced == binded) { if (alloced == binded) {
totalDataSize += dataSize + sizeof(SShellSubmitBlock); totalDataSize += dataSize + sizeof(SSubmitBlk);
if (totalDataSize > pBlock->nAllocSize) { if (totalDataSize > pBlock->nAllocSize) {
const double factor = 1.5; const double factor = 1.5;
void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor)); void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor));
...@@ -342,7 +342,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { ...@@ -342,7 +342,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
} }
} }
char* data = pBlock->pData + sizeof(SShellSubmitBlock) + dataSize * binded; char* data = pBlock->pData + sizeof(SSubmitBlk) + dataSize * binded;
for (uint32_t j = 0; j < pBlock->numOfParams; ++j) { for (uint32_t j = 0; j < pBlock->numOfParams; ++j) {
SParamInfo* param = pBlock->params + j; SParamInfo* param = pBlock->params + j;
int code = doBindParam(data, param, bind + param->idx); int code = doBindParam(data, param, bind + param->idx);
...@@ -365,10 +365,10 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { ...@@ -365,10 +365,10 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size += totalDataSize / alloced; pBlock->size += totalDataSize / alloced;
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData; SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
pSubmit->numOfRows += pSubmit->numOfRows / alloced; pSubmit->numOfRows += pSubmit->numOfRows / alloced;
} }
...@@ -398,10 +398,10 @@ static int insertStmtReset(STscStmt* pStmt) { ...@@ -398,10 +398,10 @@ static int insertStmtReset(STscStmt* pStmt) {
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i];
uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size = sizeof(SShellSubmitBlock) + totalDataSize / alloced; pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced;
SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData; SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData;
pSubmit->numOfRows = pSubmit->numOfRows / alloced; pSubmit->numOfRows = pSubmit->numOfRows / alloced;
} }
} }
......
...@@ -58,7 +58,7 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) { ...@@ -58,7 +58,7 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
return pSTableMeta->schema; return pSTableMeta->schema;
} }
return pTableMeta->schema; return (SSchema*) pTableMeta->schema;
} }
SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) { SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
......
...@@ -508,14 +508,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -508,14 +508,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->free = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type);
pSql->cmd.payloadLen = pMsg - pStart; pRetrieveMsg->header.vgId = htonl(1);
pMsg += sizeof(SRetrieveTableMsg);
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
//SShellSubmitMsg *pShellMsg; //SSubmitMsg *pShellMsg;
//char * pMsg; //char * pMsg;
//STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); //STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
...@@ -524,14 +527,14 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { ...@@ -524,14 +527,14 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) {
//pMsg = buf + tsRpcHeadSize; //pMsg = buf + tsRpcHeadSize;
//TODO set iplist //TODO set iplist
//pShellMsg = (SShellSubmitMsg *)pMsg; //pShellMsg = (SSubmitMsg *)pMsg;
//pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode); //pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode);
//tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip), //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip),
// htons(pShellMsg->vnode)); // htons(pShellMsg->vnode));
} }
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SShellSubmitMsg *pShellMsg; SSubmitMsg *pShellMsg;
char * pMsg, *pStart; char * pMsg, *pStart;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
...@@ -539,24 +542,23 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -539,24 +542,23 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pStart = pSql->cmd.payload + tsRpcHeadSize; pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart; pMsg = pStart;
pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->desc.numOfVnodes = htonl(1); SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
pMsgDesc->numOfVnodes = htonl(1); //set the number of vnodes
pMsg += sizeof(SMsgDesc);
pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); pShellMsg = (SSubmitMsg *)pMsg;
pShellMsg->header.vgId = htonl(pTableMeta->vgId); pShellMsg->header.vgId = htonl(pTableMeta->vgId);
pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen); pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pShellMsg->length = pShellMsg->header.contLen;
pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted
// pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
// tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip), // tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip),
// htons(pShellMsg->vnode)); // htons(pShellMsg->vnode));
// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -147,7 +147,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -147,7 +147,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
} }
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pObj; return pObj;
} }
......
...@@ -695,6 +695,49 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, ...@@ -695,6 +695,49 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
int32_t firstPartLen = 0;
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
pDataBlock += sizeof(SSubmitBlk);
int32_t total = sizeof(int32_t)*2;
for(int32_t i = 0; i < tinfo.numOfColumns; ++i) {
switch (pSchema[i].type) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
assert(0); // not support binary yet
firstPartLen += sizeof(int32_t);break;
}
default:
firstPartLen += tDataTypeDesc[pSchema[i].type].nSize;
total += tDataTypeDesc[pSchema[i].type].nSize;
}
}
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
SSubmitBlk* pBlock = pTableDataBlock->pData;
int32_t rows = htons(pBlock->numOfRows);
for(int32_t i = 0; i < rows; ++i) {
*(int32_t*) pDataBlock = total;
pDataBlock += sizeof(int32_t);
*(int32_t*) pDataBlock = firstPartLen;
pDataBlock += sizeof(int32_t);
memcpy(pDataBlock, p, pTableDataBlock->rowSize);
p += pTableDataBlock->rowSize;
pDataBlock += pTableDataBlock->rowSize;
}
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
...@@ -716,7 +759,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -716,7 +759,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
return ret; return ret;
} }
int64_t destSize = dataBuf->size + pOneTableBlock->size; int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2;
if (dataBuf->nAllocSize < destSize) { if (dataBuf->nAllocSize < destSize) {
while (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) {
dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5; dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5;
...@@ -730,29 +773,33 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -730,29 +773,33 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize); tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
tfree(dataBuf->pData);
tscDestroyBlockArrayList(pVnodeDataBlockList); tscDestroyBlockArrayList(pVnodeDataBlockList);
tfree(dataBuf->pData);
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
} }
SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
sortRemoveDuplicates(pOneTableBlock); sortRemoveDuplicates(pOneTableBlock);
char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid, tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e)); pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e));
pBlocks->sid = htonl(pBlocks->sid); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
pBlocks->tid = htonl(pBlocks->tid);
pBlocks->uid = htobe64(pBlocks->uid); pBlocks->uid = htobe64(pBlocks->uid);
pBlocks->sversion = htonl(pBlocks->sversion); pBlocks->sversion = htonl(pBlocks->sversion);
pBlocks->numOfRows = htons(pBlocks->numOfRows); pBlocks->numOfRows = htons(pBlocks->numOfRows);
memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size); pBlocks->len = htonl(len);
dataBuf->size += pOneTableBlock->size; // erase the empty space reserved for binary data
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock);
dataBuf->size += (len + sizeof(SSubmitBlk));
dataBuf->numOfTables += 1; dataBuf->numOfTables += 1;
} }
......
...@@ -77,7 +77,9 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -77,7 +77,9 @@ void dnodeRead(SRpcMsg *pMsg) {
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
SRpcContext *pRpcContext = NULL; SRpcContext *pRpcContext = NULL;
dTrace("dnode read msg disposal");
// SMsgDesc *pDesc = pCont; // SMsgDesc *pDesc = pCont;
// pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); // pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
// pCont += sizeof(SMsgDesc); // pCont += sizeof(SMsgDesc);
...@@ -229,7 +231,8 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -229,7 +231,8 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL; SQInfo* pQInfo = NULL;
int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo); void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
...@@ -243,17 +246,17 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -243,17 +246,17 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
.msgType = 0 .msgType = 0
}; };
rpcSendResponse(&rpcRsp);
// do execute query // do execute query
qTableQuery(pQInfo); qTableQuery(pQInfo);
rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = htobe64(pRetrieve->qhandle); void *pQInfo = htobe64(pRetrieve->qhandle);
dTrace("retrieve msg is disposed, qInfo:%p", pQInfo); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t rowSize = 0; int32_t rowSize = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
...@@ -284,11 +287,12 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -284,11 +287,12 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
contLen = 100; contLen = 100;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen);
pRsp->numOfRows = 0; pRsp->numOfRows = htonl(1);
pRsp->precision = 0; pRsp->precision = htons(0);
pRsp->offset = 0; pRsp->offset = htobe64(0);
pRsp->useconds = 0; pRsp->useconds = htobe64(0);
// todo set the data
*(int64_t*) pRsp->data = 1000; *(int64_t*) pRsp->data = 1000;
rpcRsp = (SRpcMsg) { rpcRsp = (SRpcMsg) {
......
...@@ -276,7 +276,10 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { ...@@ -276,7 +276,10 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
pRsp->affectedRows = htonl(1); pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0; pRsp->numOfFailedBlocks = 0;
// todo write to tsdb void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
assert(tsdb != NULL);
tsdbInsertData(tsdb, pMsg->pCont);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
...@@ -285,6 +288,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { ...@@ -285,6 +288,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
.code = 0, .code = 0,
.msgType = 0 .msgType = 0
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
......
...@@ -188,14 +188,14 @@ extern char *taosMsg[]; ...@@ -188,14 +188,14 @@ extern char *taosMsg[];
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { //typedef struct {
int32_t vnode; // int32_t vnode;
int32_t sid; // int32_t sid;
int32_t sversion; // int32_t sversion;
uint64_t uid; // uint64_t uid;
int16_t numOfRows; // int16_t numOfRows;
char payLoad[]; // char payLoad[];
} SShellSubmitBlock; //} SShellSubmitBlock;
typedef struct { typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
...@@ -206,13 +206,33 @@ typedef struct SMsgHead { ...@@ -206,13 +206,33 @@ typedef struct SMsgHead {
int32_t vgId; int32_t vgId;
} SMsgHead; } SMsgHead;
typedef struct { //typedef struct {
SMsgDesc desc; // SMsgDesc desc;
SMsgHead header; // SMsgHead header;
int16_t import; // int16_t import;
int32_t numOfTables; // total number of sid // int32_t numOfTables; // total number of sid
char blks[]; // number of data blocks, each table has at least one data block // char blks[]; // number of data blocks, each table has at least one data block
} SShellSubmitMsg; //} SShellSubmitMsg;
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
// Submit message for this TSDB
typedef struct SSubmitMsg {
SMsgHead header;
int32_t length;
int32_t compressed:2;
int32_t numOfBlocks:30;
SSubmitBlk blocks[];
} SSubmitMsg;
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
...@@ -506,14 +526,16 @@ typedef struct { ...@@ -506,14 +526,16 @@ typedef struct {
} SQueryTableRsp; } SQueryTableRsp;
typedef struct { typedef struct {
SMsgHead header;
uint64_t qhandle; uint64_t qhandle;
uint16_t free; uint16_t free;
} SRetrieveTableMsg; } SRetrieveTableMsg;
typedef struct { typedef struct SRetrieveTableRsp {
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client
int16_t precision; int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
char data[]; char data[];
} SRetrieveTableRsp; } SRetrieveTableRsp;
......
...@@ -171,6 +171,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -171,6 +171,7 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo { typedef struct SQInfo {
uint64_t signature; uint64_t signature;
void* pVnode;
TSKEY startTime; TSKEY startTime;
int64_t elapsedTime; int64_t elapsedTime;
SResultRec rec; SResultRec rec;
...@@ -205,7 +206,7 @@ typedef struct SQInfo { ...@@ -205,7 +206,7 @@ typedef struct SQInfo {
* @param pQInfo * @param pQInfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo);
/** /**
* query on single table * query on single table
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
*/ */
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h" #include "hashfunc.h"
#include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tscompression.h" #include "tscompression.h"
...@@ -1482,10 +1482,11 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i ...@@ -1482,10 +1482,11 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i
} }
} }
static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) { static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order,
bool isSTableQuery) {
dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv));
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo));
pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx)); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx));
...@@ -1890,7 +1891,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { ...@@ -1890,7 +1891,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0;
} }
// pQuery->pointsOffset = pQuery->pointsToRead; // pQuery->pointsOffset = pQuery->pointsToRead;
} }
...@@ -2242,7 +2243,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { ...@@ -2242,7 +2243,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) {
size_t s = taosArrayGetSize(pQInfo->pTableIdList); size_t s = taosArrayGetSize(pQInfo->pTableIdList);
num = MAX(s, INITIAL_RESULT_ROWS_VALUE); num = MAX(s, INITIAL_RESULT_ROWS_VALUE);
} else { // for super table query, one page for each subset } else { // for super table query, one page for each subset
// num = pQInfo->pSidSet->numOfSubSet; // num = pQInfo->pSidSet->numOfSubSet;
} }
assert(num > 0); assert(num > 0);
...@@ -2289,7 +2290,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { ...@@ -2289,7 +2290,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
// tSidSetDestroy(&pQInfo->pSidSet); // tSidSetDestroy(&pQInfo->pSidSet);
if (pQInfo->pTableDataInfo != NULL) { if (pQInfo->pTableDataInfo != NULL) {
// size_t num = taosHashGetSize(pQInfo->pTableIdList); // size_t num = taosHashGetSize(pQInfo->pTableIdList);
for (int32_t j = 0; j < 0; ++j) { for (int32_t j = 0; j < 0; ++j) {
destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols);
} }
...@@ -2328,7 +2329,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -2328,7 +2329,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
pQuery->lastKey = pQuery->window.skey; pQuery->lastKey = pQuery->window.skey;
// create runtime environment // create runtime environment
// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel;
// get one queried meter // get one queried meter
assert(0); assert(0);
...@@ -2388,7 +2389,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -2388,7 +2389,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
taosArrayPush(cols, &pQuery->colList[i]); taosArrayPush(cols, &pQuery->colList[i]);
} }
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(&cond, sa, cols); pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(NULL, &cond, sa, cols);
// metric query do not invoke interpolation, it will be done at the second-stage merge // metric query do not invoke interpolation, it will be done at the second-stage merge
if (!isPointInterpoQuery(pQuery)) { if (!isPointInterpoQuery(pQuery)) {
...@@ -2409,7 +2410,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { ...@@ -2409,7 +2410,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) {
*/ */
void vnodeDecMeterRefcnt(SQInfo *pQInfo) { void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
if (pQInfo != NULL) { if (pQInfo != NULL) {
// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); // assert(taosHashGetSize(pQInfo->pTableIdList) >= 1);
} }
#if 0 #if 0
...@@ -2587,23 +2588,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2587,23 +2588,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfPoints; int numOfPoints;
TSKEY *keyList; TSKEY *keyList;
if (num <= 0) return -1; if (num <= 0) return -1;
keyList = (TSKEY *)pValue; keyList = (TSKEY *)pValue;
firstPos = 0; firstPos = 0;
lastPos = num - 1; lastPos = num - 1;
if (order == 0) { if (order == 0) {
// find the first position which is smaller than the key // find the first position which is smaller than the key
while (1) { while (1) {
if (key >= keyList[lastPos]) return lastPos; if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos; if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1; if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1; numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
} else if (key > keyList[midPos]) { } else if (key > keyList[midPos]) {
...@@ -2612,13 +2613,13 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2612,13 +2613,13 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
break; break;
} }
} }
} else { } else {
// find the first position which is bigger than the key // find the first position which is bigger than the key
while (1) { while (1) {
if (key <= keyList[firstPos]) return firstPos; if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos; if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) { if (key > keyList[lastPos]) {
lastPos = lastPos + 1; lastPos = lastPos + 1;
if (lastPos >= num) if (lastPos >= num)
...@@ -2626,10 +2627,10 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2626,10 +2627,10 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
else else
return lastPos; return lastPos;
} }
numOfPoints = lastPos - firstPos + 1; numOfPoints = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfPoints >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
} else if (key > keyList[midPos]) { } else if (key > keyList[midPos]) {
...@@ -2639,7 +2640,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2639,7 +2640,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
} }
} }
} }
return midPos; return midPos;
} }
...@@ -2648,65 +2649,68 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2648,65 +2649,68 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t cnt = 0; int64_t cnt = 0;
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
// check if query is killed or not set the status of query to pass the status check // check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return cnt; return cnt;
} }
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) { if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) {
TSKEY skey1, ekey1; TSKEY skey1, ekey1;
STimeWindow w = {0}; STimeWindow w = {0};
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (QUERY_IS_ASC_QUERY(pQuery)) { if (QUERY_IS_ASC_QUERY(pQuery)) {
doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w); doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey,
&skey1, &ekey1, &w);
pWindowResInfo->startTime = w.skey; pWindowResInfo->startTime = w.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
} else { } else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime; TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime;
doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1,
&w);
pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->startTime = pQuery->window.skey;
pWindowResInfo->prevSKey = w.skey; pWindowResInfo->prevSKey = w.skey;
} }
} }
int32_t numOfRes = 0; int32_t numOfRes = 0;
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis);
int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes,
&pRuntimeEnv->windowResInfo, pDataBlock); &pRuntimeEnv->windowResInfo, pDataBlock);
// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d", // dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d,
// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot, // checked:%d",
// pQuery->pos, blockInfo.size, forwardStep); // GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId,
// pQueryHandle->cur.slot, pQuery->pos, blockInfo.size, forwardStep);
// save last access position // save last access position
cnt += forwardStep; cnt += forwardStep;
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
break; break;
} }
} }
// if the result buffer is not full, set the query completed flag // if the result buffer is not full, set the query completed flag
if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
setQueryStatus(pQuery, QUERY_COMPLETED); setQueryStatus(pQuery, QUERY_COMPLETED);
} }
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? QUERY_ASC_FORWARD_STEP:QUERY_DESC_FORWARD_STEP; int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step); removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1;
...@@ -2714,7 +2718,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2714,7 +2718,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
} }
} }
return cnt; return cnt;
} }
...@@ -2976,29 +2980,30 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) { ...@@ -2976,29 +2980,30 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { // while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) {
// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; // int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx];
// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; // int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1];
// //
// assert(0); // assert(0);
// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); // // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end);
// if (ret < 0) { // not enough disk space to save the data into disk // if (ret < 0) { // not enough disk space to save the data into disk
// return -1; // return -1;
// } // }
// //
// pQInfo->subgroupIdx += 1; // pQInfo->subgroupIdx += 1;
// //
// // this group generates at least one result, return results // // this group generates at least one result, return results
// if (ret > 0) { // if (ret > 0) {
// break; // break;
// } // }
// //
// assert(pQInfo->numOfGroupResultPages == 0); // assert(pQInfo->numOfGroupResultPages == 0);
// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); // dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1);
// } // }
// //
// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), // dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms",
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); // GET_QINFO_ADDR(pQuery),
// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3013,10 +3018,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { ...@@ -3013,10 +3018,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
} }
// set current query completed // set current query completed
// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) {
// pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables;
// return; // return;
// } // }
} }
SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv;
...@@ -3923,7 +3928,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { ...@@ -3923,7 +3928,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else { } else {
// totalSubset = pQInfo->pSidSet->numOfSubSet; // totalSubset = pQInfo->pSidSet->numOfSubSet;
} }
return totalSubset; return totalSubset;
...@@ -4303,7 +4308,7 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { ...@@ -4303,7 +4308,7 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
#endif #endif
} }
int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param, void* tsdb) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -4327,8 +4332,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { ...@@ -4327,8 +4332,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
pQuery->lastKey = pQuery->window.skey; pQuery->lastKey = pQuery->window.skey;
STsdbQueryCond cond = {0}; STsdbQueryCond cond = {0};
cond.twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey};
cond.order = pQuery->order.order; cond.order = pQuery->order.order;
cond.colList = *pQuery->colList; cond.colList = *pQuery->colList;
...@@ -4337,7 +4342,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { ...@@ -4337,7 +4342,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) {
taosArrayPush(cols, &pQuery->colList[i]); taosArrayPush(cols, &pQuery->colList[i]);
} }
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols);
pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pQuery = pQuery;
...@@ -5248,41 +5254,42 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { ...@@ -5248,41 +5254,42 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
// pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned);
} }
void qTableQuery(SQInfo* pQInfo) { void qTableQuery(SQInfo *pQInfo) {
assert(pQInfo != NULL); assert(pQInfo != NULL);
if (pQInfo->killed) { if (pQInfo->killed) {
dTrace("QInfo:%p it is already killed, abort", pQInfo); dTrace("QInfo:%p it is already killed, abort", pQInfo);
return; return;
} }
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery * pQuery = pRuntimeEnv->pQuery;
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); // dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo);
if (vnodeHasRemainResults(pQInfo)) { if (vnodeHasRemainResults(pQInfo)) {
/* /*
* There are remain results that are not returned due to result interpolation * There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results. * So, we do keep in this procedure instead of launching retrieve procedure for next results.
*/ */
int32_t numOfInterpo = 0; int32_t numOfInterpo = 0;
int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo);
pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata,
(tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo);
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
pQInfo->pointsInterpo += numOfInterpo; pQInfo->pointsInterpo += numOfInterpo;
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d",
// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); // pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo,
// pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
} }
// here we have scan all qualified data in both data file and cache // here we have scan all qualified data in both data file and cache
if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
// continue to get push data from the group result // continue to get push data from the group result
...@@ -5291,66 +5298,68 @@ void qTableQuery(SQInfo* pQInfo) { ...@@ -5291,66 +5298,68 @@ void qTableQuery(SQInfo* pQInfo) {
// todo limit the output for interval query? // todo limit the output for interval query?
pQuery->rec.pointsRead = 0; pQuery->rec.pointsRead = 0;
pQInfo->subgroupIdx = 0; // always start from 0 pQInfo->subgroupIdx = 0; // always start from 0
if (pRuntimeEnv->windowResInfo.size > 0) { if (pRuntimeEnv->windowResInfo.size > 0) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
pQInfo->rec.pointsRead += pQuery->rec.pointsRead; pQInfo->rec.pointsRead += pQuery->rec.pointsRead;
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx);
if (pQuery->rec.pointsRead > 0) { if (pQuery->rec.pointsRead > 0) {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d
// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, // totalReturn:%d",
// pQInfo->pointsInterpo, pQInfo->pointsReturned); // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead,
// pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
} }
} }
} }
// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode,
// pMeterObj->meterId, pQInfo->pointsRead); // pMeterObj->sid,
// pMeterObj->meterId, pQInfo->pointsRead);
// vnodePrintQueryStatistics(pSupporter);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
return; return;
} }
// number of points returned during this query // number of points returned during this query
pQuery->rec.pointsRead = 0; pQuery->rec.pointsRead = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); // assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead);
tableIntervalProcessor(pQInfo); tableIntervalProcessor(pQInfo);
} else { } else {
if (isFixedOutputQuery(pQuery)) { if (isFixedOutputQuery(pQuery)) {
assert(pQuery->checkBufferInLoop == 0); assert(pQuery->checkBufferInLoop == 0);
tableFixedOutputProcessor(pQInfo); tableFixedOutputProcessor(pQInfo);
} else { // diff/add/multiply/subtract/division } else { // diff/add/multiply/subtract/division
assert(pQuery->checkBufferInLoop == 1); assert(pQuery->checkBufferInLoop == 1);
tableMultiOutputProcessor(pQInfo); tableMultiOutputProcessor(pQInfo);
} }
} }
// record the total elapsed time // record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st); pQInfo->elapsedTime += (taosGetTimestampUs() - st);
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
// pQInfo->over = 1; // pQInfo->over = 1;
} else { } else {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, // dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo,
// pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); // pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo); // vnodeDecRefCount(pQInfo);
} }
void qSuperTableQuery(void *pReadMsg) { void qSuperTableQuery(void *pReadMsg) {
...@@ -5460,10 +5469,10 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { ...@@ -5460,10 +5469,10 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
return 0; return 0;
} }
static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) { static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList) {
pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId); pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId);
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey); pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey);
...@@ -5483,12 +5492,12 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa ...@@ -5483,12 +5492,12 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa
pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit); pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit);
pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset); pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset);
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset); pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen); pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks); pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks);
pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder); pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder);
// query msg safety check // query msg safety check
if (validateQueryMeterMsg(pQueryTableMsg) != 0) { if (validateQueryMeterMsg(pQueryTableMsg) != 0) {
return TSDB_CODE_INVALID_QUERY_MSG; return TSDB_CODE_INVALID_QUERY_MSG;
...@@ -5590,24 +5599,24 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa ...@@ -5590,24 +5599,24 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa
pQueryTableMsg->colNameList = (int64_t)pMsg; pQueryTableMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryTableMsg->colNameLen; pMsg += pQueryTableMsg->colNameLen;
} }
*pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo)); *pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo));
STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key); pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo); taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) { for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->sid = htonl(pTableIdInfo->sid);
pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid);
pTableIdInfo->key = htobe64(pTableIdInfo->key); pTableIdInfo->key = htobe64(pTableIdInfo->key);
taosArrayPush(*pTableIdList, pTableIdInfo); taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo); pMsg += sizeof(STableIdInfo);
} }
...@@ -5710,7 +5719,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs ...@@ -5710,7 +5719,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) { static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) {
*pSqlFuncExpr = NULL; *pSqlFuncExpr = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols); SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols);
if (pExprs == NULL) { if (pExprs == NULL) {
tfree(pQueryMsg->pSqlFuncExprs); tfree(pQueryMsg->pSqlFuncExprs);
...@@ -5752,11 +5761,11 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct ...@@ -5752,11 +5761,11 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct
type = TSDB_DATA_TYPE_DOUBLE; type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypeDesc[type].nSize; bytes = tDataTypeDesc[type].nSize;
} else { // parse the normal column } else { // parse the normal column
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase);
assert(j < pQueryMsg->numOfCols); assert(j < pQueryMsg->numOfCols);
SColumnInfo* pCol = &pQueryMsg->colList[j]; SColumnInfo *pCol = &pQueryMsg->colList[j];
type = pCol->type; type = pCol->type;
bytes = pCol->bytes; bytes = pCol->bytes;
} }
...@@ -5909,7 +5918,8 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { ...@@ -5909,7 +5918,8 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) { static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs,
SArray *pTableIdList) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) { if (pQInfo == NULL) {
goto _clean_memory; goto _clean_memory;
...@@ -5944,20 +5954,20 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -5944,20 +5954,20 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
goto _clean_memory; goto _clean_memory;
} }
for (int16_t i = 0; i < numOfCols; ++i) { for (int16_t i = 0; i < numOfCols; ++i) {
pQuery->colList[i].info = pQueryMsg->colList[i]; pQuery->colList[i].info = pQueryMsg->colList[i];
// SColumnInfo *pColInfo = &pQuery->colList[i].data; // SColumnInfo *pColInfo = &pQuery->colList[i].data;
// pColInfo->filters = NULL; // pColInfo->filters = NULL;
// if (colList[i].numOfFilters > 0) { // if (colList[i].numOfFilters > 0) {
// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); // pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo));
// //
// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) { // for (int32_t j = 0; j < colList[i].numOfFilters; ++j) {
// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]); // tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]);
// } // }
// } else { // } else {
// pQuery->colList[i].data.filters = NULL; // pQuery->colList[i].data.filters = NULL;
// } // }
} }
// calculate the result row size // calculate the result row size
...@@ -6003,9 +6013,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou ...@@ -6003,9 +6013,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = (uint64_t)pQInfo; pQInfo->signature = (uint64_t)pQInfo;
pQInfo->pTableIdList = pTableIdList; pQInfo->pTableIdList = pTableIdList;
pQuery->pos = -1; pQuery->pos = -1;
// dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo); // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
// pQInfo);
return pQInfo; return pQInfo;
...@@ -6035,7 +6046,7 @@ bool isQInfoValid(void *param) { ...@@ -6035,7 +6046,7 @@ bool isQInfoValid(void *param) {
if (pQInfo == NULL) { if (pQInfo == NULL) {
return false; return false;
} }
/* /*
* pQInfo->signature may be changed by another thread, so we assign value of signature * pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable * into local variable, then compare by using local variable
...@@ -6044,75 +6055,75 @@ bool isQInfoValid(void *param) { ...@@ -6044,75 +6055,75 @@ bool isQInfoValid(void *param) {
return (sig == (uint64_t)pQInfo); return (sig == (uint64_t)pQInfo);
} }
void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) { void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) {
if (!isQInfoValid(pQInfo)) { if (!isQInfoValid(pQInfo)) {
return; return;
} }
pQInfo->killed = 1; pQInfo->killed = 1;
dTrace("QInfo:%p start to free SQInfo", pQInfo); dTrace("QInfo:%p start to free SQInfo", pQInfo);
if (decQueryRef) { if (decQueryRef) {
vnodeDecMeterRefcnt(pQInfo); vnodeDecMeterRefcnt(pQInfo);
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int col = 0; col < pQuery->numOfOutputCols; ++col) { for (int col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
// for (int col = 0; col < pQuery->numOfCols; ++col) { // for (int col = 0; col < pQuery->numOfCols; ++col) {
// vnodeFreeColumnInfo(&pQuery->colList[col].data); // vnodeFreeColumnInfo(&pQuery->colList[col].data);
// } // }
// //
// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { // if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
// tfree(pQuery->tsData); // tfree(pQuery->tsData);
// } // }
sem_destroy(&(pQInfo->dataReady)); sem_destroy(&(pQInfo->dataReady));
vnodeQueryFreeQInfoEx(pQInfo); vnodeQueryFreeQInfoEx(pQInfo);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) { if (pColFilter->numOfFilters > 0) {
tfree(pColFilter->pFilters); tfree(pColFilter->pFilters);
} }
} }
tfree(pQuery->pFilterInfo); tfree(pQuery->pFilterInfo);
tfree(pQuery->colList); tfree(pQuery->colList);
tfree(pQuery->sdata); tfree(pQuery->sdata);
if (pQuery->pSelectExpr != NULL) { if (pQuery->pSelectExpr != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
if (pBinExprInfo->numOfCols > 0) { if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns); tfree(pBinExprInfo->pReqColumns);
tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL);
} }
} }
tfree(pQuery->pSelectExpr); tfree(pQuery->pSelectExpr);
} }
if (pQuery->defaultVal != NULL) { if (pQuery->defaultVal != NULL) {
tfree(pQuery->defaultVal); tfree(pQuery->defaultVal);
} }
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery); tfree(pQuery);
// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); // dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId);
//destroy signature, in order to avoid the query process pass the object safety check // destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo)); memset(pQInfo, 0, sizeof(SQInfo));
tfree(pQInfo); tfree(pQInfo);
} }
static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SArray* pTableIdList, SQInfo **pQInfo) { SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList);
...@@ -6121,16 +6132,12 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE ...@@ -6121,16 +6132,12 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
goto _error; goto _error;
} }
SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery; SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery;
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo;
// if (pTableIdList != NULL && pTableIdList[0]->key > 0) {
// pQuery->window.skey = pTableIdList[0]->key;
// } else {
pQuery->window.skey = pQueryMsg->window.skey; pQuery->window.skey = pQueryMsg->window.skey;
pQuery->window.ekey = pQueryMsg->window.ekey; pQuery->window.ekey = pQueryMsg->window.ekey;
pQuery->lastKey = pQuery->window.skey; pQuery->lastKey = pQuery->window.skey;
if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) {
...@@ -6151,7 +6158,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE ...@@ -6151,7 +6158,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
tsBufNextPos(pTSBuf); tsBufNextPos(pTSBuf);
} }
if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf)) != TSDB_CODE_SUCCESS) { if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -6163,18 +6170,18 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE ...@@ -6163,18 +6170,18 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
// dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount); // pQInfo->refCount);
return code; return code;
_error: _error:
// table query ref will be decrease during error handling // table query ref will be decrease during error handling
vnodeFreeQInfo(*pQInfo, false); vnodeFreeQInfo(*pQInfo, false);
return code; return code;
} }
int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL); assert(pQueryTableMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SArray* pTableIdList = NULL; SArray *pTableIdList = NULL;
if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) { if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -6205,14 +6212,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { ...@@ -6205,14 +6212,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) { if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) {
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else { } else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo); code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
} }
_query_over: _query_over:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pTableIdList); taosArrayDestroy(pTableIdList);
} }
// if failed to add ref for all meters in this query, abort current query // if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber); // vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
...@@ -6231,31 +6238,31 @@ _query_over: ...@@ -6231,31 +6238,31 @@ _query_over:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) { int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) {
if (pQInfo == NULL || !isQInfoValid(pQInfo)) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE; return TSDB_CODE_INVALID_QHANDLE;
} }
if (pQInfo->killed) { if (pQInfo->killed) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) { if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED; return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client } else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return abs(pQInfo->code); return abs(pQInfo->code);
} }
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
SQuery* pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
// *numOfRows = pQInfo->rec.pointsRead; // *numOfRows = pQInfo->rec.pointsRead;
// *rowsize = pQuery->rowSize; // *rowsize = pQuery->rowSize;
*numOfRows = 1; *numOfRows = 1;
// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, // dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec,
// *rowsize, *numOfRows, pQInfo->code); // *rowsize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed. if (pQInfo->code < 0) { // less than 0 means there are error existed.
return -pQInfo->code; return -pQInfo->code;
} }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
// #include "tsdb.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
...@@ -395,6 +394,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -395,6 +394,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
SSkipListNode *px = pSkipList->pHead; SSkipListNode *px = pSkipList->pHead;
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
bool identical = false;
for (int32_t i = pSkipList->level - 1; i >= 0; --i) { for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i);
while (p != NULL) { while (p != NULL) {
...@@ -402,11 +402,16 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -402,11 +402,16 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode); char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode);
// if the forward element is less than the specified key, forward one step // if the forward element is less than the specified key, forward one step
if (pSkipList->comparFn(key, newDatakey) < 0) { int32_t ret = pSkipList->comparFn(key, newDatakey);
if (ret < 0) {
px = p; px = p;
p = SL_GET_FORWARD_POINTER(px, i); p = SL_GET_FORWARD_POINTER(px, i);
} else { } else {
if (identical == false) {
identical = (ret == 0);
}
break; break;
} }
} }
...@@ -418,17 +423,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -418,17 +423,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) {
} }
// if the skip list does not allowed identical key inserted, the new data will be discarded. // if the skip list does not allowed identical key inserted, the new data will be discarded.
if (pSkipList->keyInfo.dupKey == 0 && forward[0] != pSkipList->pHead) { if (pSkipList->keyInfo.dupKey == 0 && identical) {
char *key = SL_GET_NODE_KEY(pSkipList, forward[0]); if (pSkipList->lock) {
char *pNewDataKey = SL_GET_NODE_KEY(pSkipList, pNode); pthread_rwlock_unlock(pSkipList->lock);
if (pSkipList->comparFn(key, pNewDataKey) == 0) {
if (pSkipList->lock) {
pthread_rwlock_unlock(pSkipList->lock);
}
return forward[0];
} }
return forward[0];
} }
#if SKIP_LIST_RECORD_PERFORMANCE #if SKIP_LIST_RECORD_PERFORMANCE
......
...@@ -88,15 +88,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); ...@@ -88,15 +88,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
// Submit message for one table
typedef struct {
STableId tableId;
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t len; // data part length, not including the SSubmitBlk head
char data[];
} SSubmitBlk;
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
...@@ -106,15 +97,10 @@ typedef struct { ...@@ -106,15 +97,10 @@ typedef struct {
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
// Submit message for this TSDB
typedef struct {
int32_t length;
int32_t compressed;
SSubmitBlk blocks[];
} SSubmitMsg;
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
struct STsdbRepo;
// SSubmitMsg Iterator // SSubmitMsg Iterator
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
...@@ -243,7 +229,7 @@ typedef void *tsdbpos_t; ...@@ -243,7 +229,7 @@ typedef void *tsdbpos_t;
* @param pTableList table sid list * @param pTableList table sid list
* @return * @return
*/ */
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo);
/** /**
* move to next block * move to next block
......
...@@ -63,6 +63,26 @@ typedef struct { ...@@ -63,6 +63,26 @@ typedef struct {
SFileGroup fGroup[]; SFileGroup fGroup[];
} STsdbFileH; } STsdbFileH;
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
*/
typedef struct {
int64_t last : 1; // If the block in data file or last file
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
int32_t algorithm : 8; // Compression algorithm
int32_t numOfPoints : 24; // Number of total points
int32_t sversion; // Schema version
int32_t len; // Data block length or nothing
int16_t numOfSubBlocks; // Number of sub-blocks;
int16_t numOfCols;
TSKEY keyFirst;
TSKEY keyLast;
} SCompBlock;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
......
...@@ -44,7 +44,7 @@ typedef struct { ...@@ -44,7 +44,7 @@ typedef struct {
typedef struct STable { typedef struct STable {
int8_t type; int8_t type;
STableId tableId; STableId tableId;
int32_t superUid; // Super table UID int64_t superUid; // Super table UID
int32_t sversion; int32_t sversion;
STSchema * schema; STSchema * schema;
STSchema * tagSchema; STSchema * tagSchema;
...@@ -98,6 +98,8 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta); ...@@ -98,6 +98,8 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta);
#define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id] #define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id]
#define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */ #define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo);
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
......
...@@ -33,26 +33,6 @@ typedef struct { ...@@ -33,26 +33,6 @@ typedef struct {
int64_t offset; int64_t offset;
} SCompIdx; } SCompIdx;
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
*/
typedef struct {
int64_t last : 1; // If the block in data file or last file
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
int32_t algorithm : 8; // Compression algorithm
int32_t numOfPoints : 24; // Number of total points
int32_t sversion; // Schema version
int32_t len; // Data block length or nothing
int16_t numOfSubBlocks; // Number of sub-blocks;
int16_t numOfCols;
TSKEY keyFirst;
TSKEY keyLast;
} SCompBlock;
typedef struct { typedef struct {
int32_t delimiter; // For recovery usage int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
......
...@@ -485,6 +485,10 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -485,6 +485,10 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL || pIter == NULL) return -1; if (pMsg == NULL || pIter == NULL) return -1;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pMsg->compressed = htonl(pMsg->compressed);
pIter->totalLen = pMsg->length; pIter->totalLen = pMsg->length;
pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
...@@ -499,7 +503,15 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { ...@@ -499,7 +503,15 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
SSubmitBlk *pBlock = pIter->pBlock; SSubmitBlk *pBlock = pIter->pBlock;
if (pBlock == NULL) return NULL; if (pBlock == NULL) return NULL;
pBlock->len = htonl(pBlock->len);
pBlock->numOfRows = htons(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len;
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
pIter->pBlock = NULL; pIter->pBlock = NULL;
...@@ -510,6 +522,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { ...@@ -510,6 +522,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
return pBlock; return pBlock;
} }
STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) {
STsdbRepo *tsdb = (STsdbRepo *)pRepo;
return tsdb->tsdbMeta;
}
// Check the configuration and set default options // Check the configuration and set default options
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check precision // Check precision
...@@ -681,6 +698,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -681,6 +698,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
// Copy row into the memory // Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
if (pNode == NULL) { if (pNode == NULL) {
...@@ -694,7 +713,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -694,7 +713,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListPut(pTable->mem->pData, pNode); tSkipListPut(pTable->mem->pData, pNode);
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
pTable->mem->numOfPoints++;
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData);
// pTable->mem->numOfPoints++;
return 0; return 0;
} }
...@@ -702,7 +723,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -702,7 +723,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
if (pTable == NULL) return -1; if (pTable == NULL) return -1;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
......
...@@ -14,53 +14,378 @@ ...@@ -14,53 +14,378 @@
*/ */
#include "os.h" #include "os.h"
#include "tutil.h"
#include "tsdb.h" #include "tsdb.h"
#include "tsdbFile.h"
#include "tsdbMeta.h"
tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { #define EXTRA_BYTES 2
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC)
#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns))
} typedef struct SField {
// todo need the definition
} SField;
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { typedef struct SHeaderFileInfo {
return false; int32_t fileId;
} } SHeaderFileInfo;
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { typedef struct SQueryHandlePos {
int32_t fileId;
int32_t slot;
int32_t pos;
int32_t fileIndex;
} SQueryHandlePos;
} typedef struct SDataBlockLoadInfo {
int32_t fileListIndex;
int32_t fileId;
int32_t slotIdx;
int32_t sid;
SArray *pLoadedCols;
} SDataBlockLoadInfo;
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) { typedef struct SLoadCompBlockInfo {
int32_t sid; /* meter sid */
int32_t fileId;
int32_t fileListIndex;
} SLoadCompBlockInfo;
} typedef struct SQueryFilesInfo {
SArray *pFileInfo;
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { int32_t headerFd; // header file fd
int64_t headerFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct STableQueryRec {
TSKEY lastKey;
STable * pTableObj;
int64_t offsetInHeaderFile;
int32_t numOfBlocks;
int32_t start;
SCompBlock *pBlock;
} STableQueryRec;
typedef struct {
SCompBlock *compBlock;
SField * fields;
} SCompBlockFields;
typedef struct STableDataBlockInfoEx {
SCompBlockFields pBlock;
STableQueryRec * pMeterDataInfo;
int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of meters */
} STableDataBlockInfoEx;
typedef struct STsdbQueryHandle {
struct STsdbRepo* pTsdb;
SQueryHandlePos cur; // current position
SQueryHandlePos start; // the start position, used for secondary/third iteration
int32_t unzipBufSize;
char *unzipBuffer;
char *secondaryUnzipBuffer;
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
uint16_t flag; // denotes reversed scan of data or not
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
TSKEY lastKey;
int32_t blockBufferSize;
SCompBlock *pBlock;
int32_t numOfBlocks;
SField ** pFields;
SArray * pColumns; // column list, SColumnInfoEx array list
SArray * pTableIdList; // table id object list
bool locateStart;
int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek.
STableDataBlockInfoEx *pDataBlockInfoEx;
STableQueryRec * pTableQueryInfo;
int32_t tableIndex;
bool isFirstSlot;
void * qinfo; // query info handle, for debug purpose
} STsdbQueryHandle;
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
// record the maximum column width among columns of this meter/metric
SColumnInfoEx *pColumn = taosArrayGet(pQueryHandle->pColumns, 0);
int32_t maxColWidth = pColumn->info.bytes;
for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) {
int32_t bytes = pColumn[i].info.bytes;
if (bytes > maxColWidth) {
maxColWidth = bytes;
}
}
// only one unzip buffer required, since we can unzip each column one by one
pQueryHandle->unzipBufSize = (size_t)(maxColWidth * rowsPerFileBlock + EXTRA_BYTES); // plus extra_bytes
pQueryHandle->unzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize);
pQueryHandle->secondaryUnzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize);
if (pQueryHandle->unzipBuffer == NULL || pQueryHandle->secondaryUnzipBuffer == NULL) {
goto _error_clean;
}
return TSDB_CODE_SUCCESS;
_error_clean:
tfree(pQueryHandle->unzipBuffer);
tfree(pQueryHandle->secondaryUnzipBuffer);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) { static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) {
pVnodeFilesInfo->current = -1;
pVnodeFilesInfo->headerFileSize = -1;
pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value
pVnodeFilesInfo->dataFd = FD_INITIALIZER;
pVnodeFilesInfo->lastFd = FD_INITIALIZER;
}
static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) {
pBlockLoadInfo->slotIdx = -1;
pBlockLoadInfo->fileId = -1;
pBlockLoadInfo->sid = -1;
pBlockLoadInfo->fileListIndex = -1;
} }
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) { static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) {
pCompBlockLoadInfo->sid = -1;
pCompBlockLoadInfo->fileId = -1;
pCompBlockLoadInfo->fileListIndex = -1;
}
static int fileOrderComparFn(const void *p1, const void *p2) {
SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1;
SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2;
if (pInfo1->fileId == pInfo2->fileId) {
return 0;
}
return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1;
} }
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) {
return NULL; char suffix[] = ".head";
pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t));
struct dirent *pEntry = NULL;
pVnodeFilesInfo->vnodeId = vnodeId;
char* tsDirectory = "";
sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId);
DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix);
if (pDir == NULL) {
// dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix,
// strerror(errno));
return;
}
while ((pEntry = readdir(pDir)) != NULL) {
if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) {
continue;
}
if (pEntry->d_type & DT_DIR) {
continue;
}
size_t len = strlen(pEntry->d_name);
if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) {
continue;
}
int32_t vid = 0;
int32_t fid = 0;
sscanf(pEntry->d_name, "v%df%d", &vid, &fid);
if (vid != vnodeId) { /* ignore error files */
// dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId);
continue;
}
// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1;
// if (fid > pVnode->fileId || fid < firstFid) {
// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId,
// fid, firstFid, pVnode->fileId);
// continue;
// }
assert(fid >= 0 && vid >= 0);
taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid);
}
closedir(pDir);
// dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles,
// pVnodeFilesInfo->dbFilePathPrefix);
// order the files information according their names */
size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo);
qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn);
} }
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) { tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) {
// todo 1. filter not exist table
// todo 2. add the reference count for each table that is involved in query
STsdbQueryHandle *pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->pTableIdList = idList;
pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true;
pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query
// malloc buffer in order to load data from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
size_t bufferCapacity = 4096;
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoEx));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx *pCol = taosArrayGet(pColumnInfo, i);
SColumnInfoEx pDest = {{0}, 0};
pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes);
pDest.info = pCol->info;
taosArrayPush(pQueryHandle->pColumns, &pDest);
}
if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) {
return NULL;
}
initQueryFileInfoFD(&pQueryHandle->vnodeFileInfo);
vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
int32_t vnodeId = 1;
vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo);
return (tsdb_query_handle_t)pQueryHandle;
} }
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) { static int32_t next = 1;
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
if (next == 0) {
return false;
} else {
next = 0;
return true;
}
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pHandle) {
int numOfRows = 0;
int32_t numOfCols = taosArrayGetSize(pHandle->pColumns);
*skey = INT64_MIN;
while(tSkipListIterNext(pIter)) {
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) break;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) > maxKey) break;
// Convert row data to column data
if (*skey == INT64_MIN) {
*skey = dataRowKey(row);
}
*ekey = dataRowKey(row);
int32_t offset = 0;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0);
memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
offset += pColInfo->info.bytes;
}
numOfRows++;
if (numOfRows > maxRowsToRead) break;
};
return numOfRows;
} }
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) { // copy data from cache into data block
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid};
STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId);
assert(pTable != NULL);
TSKEY skey = 0, ekey = 0;
int32_t rows = 0;
if (pTable->mem != NULL) {
SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData);
rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle);
}
SDataBlockInfo blockInfo = {
.uid = tableId.uid,
.sid = tableId.tid,
.size = rows,
.window = {.skey = skey, .ekey = ekey}
};
return blockInfo;
}
// return null for data block in cache
int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
} }
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) { SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) {
} }
int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {}
int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {}
tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { return NULL; }
SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {}
tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {}
STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {}
STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册