diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 8fb6b925efd01fc1e22be509895762868fe28bc6..ccd734ec7b784d290c334628e043bcd757fa3fc2 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -18,9 +18,10 @@ #define _XOPEN_SOURCE -#include "hash.h" #include "os.h" -#include "tscSecondaryMerge.h" + +#include "hash.h" +//#include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" @@ -32,6 +33,8 @@ #include "tstoken.h" #include "ttime.h" +#include "dataformat.h" + enum { TSDB_USE_SERVER_TS = 0, TSDB_USE_CLI_TS = 1, @@ -393,7 +396,6 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error, int16_t timePrec, int32_t *code, char *tmpTokenBuf) { int32_t index = 0; - // bool isPrevOptr; //fang, never used SSQLToken sToken = {0}; char * payload = pDataBlocks->pData + pDataBlocks->size; @@ -604,8 +606,8 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 return TSDB_CODE_SUCCESS; } -static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { - pBlocks->sid = pTableMeta->sid; +static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { + pBlocks->tid = pTableMeta->sid; pBlocks->uid = pTableMeta->uid; pBlocks->sversion = pTableMeta->sversion; pBlocks->numOfRows += numOfRows; @@ -613,10 +615,10 @@ static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pTableM // data block is disordered, sort it in ascending order void sortRemoveDuplicates(STableDataBlocks *dataBuf) { - SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)dataBuf->pData; + SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData; // size is less than the total size, since duplicated rows may be removed yet. - assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SShellSubmitBlock) == dataBuf->size); + assert(pBlocks->numOfRows * dataBuf->rowSize + sizeof(SSubmitBlk) == dataBuf->size); // if use server time, this block must be ordered if (dataBuf->tsSource == TSDB_USE_SERVER_TS) { @@ -624,7 +626,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { } if (!dataBuf->ordered) { - char *pBlockData = pBlocks->payLoad; + char *pBlockData = pBlocks->data; qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); int32_t i = 0; @@ -650,7 +652,7 @@ void sortRemoveDuplicates(STableDataBlocks *dataBuf) { dataBuf->ordered = true; pBlocks->numOfRows = i + 1; - dataBuf->size = sizeof(SShellSubmitBlock) + dataBuf->rowSize * pBlocks->numOfRows; + dataBuf->size = sizeof(SSubmitBlk) + dataBuf->rowSize * pBlocks->numOfRows; } } @@ -663,7 +665,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char STableDataBlocks *dataBuf = NULL; int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SShellSubmitBlock), tinfo.rowSize, pTableMetaInfo->name, + sizeof(SSubmitBlk), tinfo.rowSize, pTableMetaInfo->name, pTableMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { return ret; @@ -691,11 +693,11 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char SParamInfo *param = dataBuf->params + i; if (param->idx == -1) { param->idx = pCmd->numOfParams++; - param->offset -= sizeof(SShellSubmitBlock); + param->offset -= sizeof(SSubmitBlk); } } - SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData); + SSubmitBlk *pBlocks = (SSubmitBlk *)(dataBuf->pData); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); dataBuf->vgId = pTableMeta->vgId; @@ -1141,7 +1143,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { STableDataBlocks *pDataBlock = NULL; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SShellSubmitBlock), pTableMetaInfo->name, + int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pDataBlock); if (ret != TSDB_CODE_SUCCESS) { goto _error_clean; @@ -1353,7 +1355,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock assert(pCmd->numOfClause == 1); STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; - SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData); + SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { @@ -1394,7 +1396,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { pCmd->pDataBlocks = tscCreateBlockArrayList(); STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SShellSubmitBlock), + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); if (ret != TSDB_CODE_SUCCESS) { return -1; @@ -1435,7 +1437,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { } pTableDataBlock = pCmd->pDataBlocks->pData[0]; - pTableDataBlock->size = sizeof(SShellSubmitBlock); + pTableDataBlock->size = sizeof(SSubmitBlk); pTableDataBlock->rowSize = tinfo.rowSize; numOfRows += pSql->res.numOfRows; diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 96215ce73c269c1a2ec0d99bb0d7a6a217b97396..241f24a74777be21ffb557209f20d227ff4ef957 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -325,12 +325,12 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; - uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); + uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); uint32_t dataSize = totalDataSize / alloced; assert(dataSize * alloced == totalDataSize); if (alloced == binded) { - totalDataSize += dataSize + sizeof(SShellSubmitBlock); + totalDataSize += dataSize + sizeof(SSubmitBlk); if (totalDataSize > pBlock->nAllocSize) { const double factor = 1.5; void* tmp = realloc(pBlock->pData, (uint32_t)(totalDataSize * factor)); @@ -342,7 +342,7 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { } } - char* data = pBlock->pData + sizeof(SShellSubmitBlock) + dataSize * binded; + char* data = pBlock->pData + sizeof(SSubmitBlk) + dataSize * binded; for (uint32_t j = 0; j < pBlock->numOfParams; ++j) { SParamInfo* param = pBlock->params + j; int code = doBindParam(data, param, bind + param->idx); @@ -365,10 +365,10 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; - uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); + uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); pBlock->size += totalDataSize / alloced; - SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData; + SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData; pSubmit->numOfRows += pSubmit->numOfRows / alloced; } @@ -398,10 +398,10 @@ static int insertStmtReset(STscStmt* pStmt) { for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; - uint32_t totalDataSize = pBlock->size - sizeof(SShellSubmitBlock); - pBlock->size = sizeof(SShellSubmitBlock) + totalDataSize / alloced; + uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); + pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced; - SShellSubmitBlock* pSubmit = (SShellSubmitBlock*)pBlock->pData; + SSubmitBlk* pSubmit = (SSubmitBlk*)pBlock->pData; pSubmit->numOfRows = pSubmit->numOfRows / alloced; } } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index be0065ff4a0773c009513beb5bddefe80b67c3e1..78d29be1af4e388791396fd17c99c61cb08b4edc 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -58,7 +58,7 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) { return pSTableMeta->schema; } - return pTableMeta->schema; + return (SSchema*) pTableMeta->schema; } SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8fb3df94d5844734ec07ee83e50d193fc0e7c7fa..58fc32c824c287d5e426236c41fee34f38526b00 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -508,14 +508,17 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pRetrieveMsg->free = htons(pQueryInfo->type); pMsg += sizeof(pQueryInfo->type); - pSql->cmd.payloadLen = pMsg - pStart; + pRetrieveMsg->header.vgId = htonl(1); + pMsg += sizeof(SRetrieveTableMsg); + + pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); + pSql->cmd.msgType = TSDB_MSG_TYPE_RETRIEVE; - return TSDB_CODE_SUCCESS; } void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { - //SShellSubmitMsg *pShellMsg; + //SSubmitMsg *pShellMsg; //char * pMsg; //STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); @@ -524,14 +527,14 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { //pMsg = buf + tsRpcHeadSize; //TODO set iplist - //pShellMsg = (SShellSubmitMsg *)pMsg; + //pShellMsg = (SSubmitMsg *)pMsg; //pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode); //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip), // htons(pShellMsg->vnode)); } int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - SShellSubmitMsg *pShellMsg; + SSubmitMsg *pShellMsg; char * pMsg, *pStart; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); @@ -539,24 +542,23 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pStart = pSql->cmd.payload + tsRpcHeadSize; pMsg = pStart; - - pShellMsg = (SShellSubmitMsg *)pMsg; - pShellMsg->desc.numOfVnodes = htonl(1); + SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; + pMsgDesc->numOfVnodes = htonl(1); //set the number of vnodes + pMsg += sizeof(SMsgDesc); - pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); + pShellMsg = (SSubmitMsg *)pMsg; pShellMsg->header.vgId = htonl(pTableMeta->vgId); pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen); + pShellMsg->length = pShellMsg->header.contLen; - pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted + pShellMsg->numOfBlocks = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; + // tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip), // htons(pShellMsg->vnode)); - -// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg); - return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 5d93e44c7712910c8a34805def9dfe9c2ed0aa82..63612d0f5f21d6cbacc3ef7118a2f2825624db08 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -147,7 +147,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid - tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); + tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg); return pObj; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index aa4e5c93aa7a067c00cc4221e574a34e66c56c1f..2a9673b192796a87481fc7f04f57ceb51eab3e41 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -695,6 +695,49 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, return TSDB_CODE_SUCCESS; } +static void trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { + int32_t firstPartLen = 0; + + STableMeta* pTableMeta = pTableDataBlock->pTableMeta; + STableComInfo tinfo = tscGetTableInfo(pTableMeta); + SSchema* pSchema = tscGetTableSchema(pTableMeta); + + memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); + pDataBlock += sizeof(SSubmitBlk); + + int32_t total = sizeof(int32_t)*2; + for(int32_t i = 0; i < tinfo.numOfColumns; ++i) { + switch (pSchema[i].type) { + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_BINARY: { + assert(0); // not support binary yet + firstPartLen += sizeof(int32_t);break; + } + default: + firstPartLen += tDataTypeDesc[pSchema[i].type].nSize; + total += tDataTypeDesc[pSchema[i].type].nSize; + } + } + + char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); + + SSubmitBlk* pBlock = pTableDataBlock->pData; + int32_t rows = htons(pBlock->numOfRows); + + for(int32_t i = 0; i < rows; ++i) { + *(int32_t*) pDataBlock = total; + pDataBlock += sizeof(int32_t); + + *(int32_t*) pDataBlock = firstPartLen; + pDataBlock += sizeof(int32_t); + + memcpy(pDataBlock, p, pTableDataBlock->rowSize); + + p += pTableDataBlock->rowSize; + pDataBlock += pTableDataBlock->rowSize; + } +} + int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { SSqlCmd* pCmd = &pSql->cmd; @@ -716,7 +759,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi return ret; } - int64_t destSize = dataBuf->size + pOneTableBlock->size; + int64_t destSize = dataBuf->size + pOneTableBlock->size + pOneTableBlock->size*sizeof(int32_t)*2; if (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) { dataBuf->nAllocSize = dataBuf->nAllocSize * 1.5; @@ -730,29 +773,33 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize); taosHashCleanup(pVnodeDataBlockHashList); - tfree(dataBuf->pData); tscDestroyBlockArrayList(pVnodeDataBlockList); + tfree(dataBuf->pData); return TSDB_CODE_CLI_OUT_OF_MEMORY; } } - SShellSubmitBlock* pBlocks = (SShellSubmitBlock*)pOneTableBlock->pData; + SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; sortRemoveDuplicates(pOneTableBlock); - char* e = (char*)pBlocks->payLoad + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); + char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); - tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->sid, - pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->payLoad), GET_INT64_VAL(e)); + tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, + pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e)); - pBlocks->sid = htonl(pBlocks->sid); + int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2); + + pBlocks->tid = htonl(pBlocks->tid); pBlocks->uid = htobe64(pBlocks->uid); pBlocks->sversion = htonl(pBlocks->sversion); pBlocks->numOfRows = htons(pBlocks->numOfRows); - - memcpy(dataBuf->pData + dataBuf->size, pOneTableBlock->pData, pOneTableBlock->size); - - dataBuf->size += pOneTableBlock->size; + + pBlocks->len = htonl(len); + + // erase the empty space reserved for binary data + trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock); + dataBuf->size += (len + sizeof(SSubmitBlk)); dataBuf->numOfTables += 1; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index b511a6bf088fef84b88adf16347a56ff022ce8a4..887845b00ed01820ec11ad641a732904145559ab 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -77,7 +77,9 @@ void dnodeRead(SRpcMsg *pMsg) { int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; SRpcContext *pRpcContext = NULL; - + + dTrace("dnode read msg disposal"); + // SMsgDesc *pDesc = pCont; // pDesc->numOfVnodes = htonl(pDesc->numOfVnodes); // pCont += sizeof(SMsgDesc); @@ -229,7 +231,8 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - int32_t code = qCreateQueryInfo(pQueryTableMsg, &pQInfo); + void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; @@ -243,17 +246,17 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { .msgType = 0 }; + rpcSendResponse(&rpcRsp); + // do execute query qTableQuery(pQInfo); - - rpcSendResponse(&rpcRsp); } static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = htobe64(pRetrieve->qhandle); - dTrace("retrieve msg is disposed, qInfo:%p", pQInfo); + dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); int32_t rowSize = 0; int32_t numOfRows = 0; @@ -284,11 +287,12 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { contLen = 100; SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); - pRsp->numOfRows = 0; - pRsp->precision = 0; - pRsp->offset = 0; - pRsp->useconds = 0; - + pRsp->numOfRows = htonl(1); + pRsp->precision = htons(0); + pRsp->offset = htobe64(0); + pRsp->useconds = htobe64(0); + + // todo set the data *(int64_t*) pRsp->data = 1000; rpcRsp = (SRpcMsg) { diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 7ed731c953f0046f9918fc337746eb75e9eda1ce..eb9c42a93fa02cc38b53c55e0fa3e5b6b257d500 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -276,7 +276,10 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { pRsp->affectedRows = htonl(1); pRsp->numOfFailedBlocks = 0; - // todo write to tsdb + void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + assert(tsdb != NULL); + + tsdbInsertData(tsdb, pMsg->pCont); SRpcMsg rpcRsp = { .handle = pMsg->rpcMsg.handle, @@ -285,6 +288,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { .code = 0, .msgType = 0 }; + rpcSendResponse(&rpcRsp); } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 467c2a2995ac15407364ed0413aa3979e4a43676..cc17df9bec97d30b4efe3b6bd5ae9a0cb1d6afff 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -188,14 +188,14 @@ extern char *taosMsg[]; #pragma pack(push, 1) -typedef struct { - int32_t vnode; - int32_t sid; - int32_t sversion; - uint64_t uid; - int16_t numOfRows; - char payLoad[]; -} SShellSubmitBlock; +//typedef struct { +// int32_t vnode; +// int32_t sid; +// int32_t sversion; +// uint64_t uid; +// int16_t numOfRows; +// char payLoad[]; +//} SShellSubmitBlock; typedef struct { int32_t numOfVnodes; @@ -206,13 +206,33 @@ typedef struct SMsgHead { int32_t vgId; } SMsgHead; -typedef struct { - SMsgDesc desc; - SMsgHead header; - int16_t import; - int32_t numOfTables; // total number of sid - char blks[]; // number of data blocks, each table has at least one data block -} SShellSubmitMsg; +//typedef struct { +// SMsgDesc desc; +// SMsgHead header; +// int16_t import; +// int32_t numOfTables; // total number of sid +// char blks[]; // number of data blocks, each table has at least one data block +//} SShellSubmitMsg; + +// Submit message for one table +typedef struct SSubmitBlk { + int64_t uid; // table unique id + int32_t tid; // table id + int32_t padding; // TODO just for padding here + int32_t sversion; // data schema version + int32_t len; // data part length, not including the SSubmitBlk head + int16_t numOfRows; // total number of rows in current submit block + char data[]; +} SSubmitBlk; + +// Submit message for this TSDB +typedef struct SSubmitMsg { + SMsgHead header; + int32_t length; + int32_t compressed:2; + int32_t numOfBlocks:30; + SSubmitBlk blocks[]; +} SSubmitMsg; typedef struct { int32_t index; // index of failed block in submit blocks @@ -506,14 +526,16 @@ typedef struct { } SQueryTableRsp; typedef struct { + SMsgHead header; uint64_t qhandle; uint16_t free; } SRetrieveTableMsg; -typedef struct { +typedef struct SRetrieveTableRsp { int32_t numOfRows; + int8_t completed; // all results are returned to client int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query + int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; char data[]; } SRetrieveTableRsp; diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 8956fb52b13e557ec5f1d36a7210cacff72a35e8..346170cde8a00049bf05b0cbb9d8fc9ad62d03ce 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -171,6 +171,7 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { uint64_t signature; + void* pVnode; TSKEY startTime; int64_t elapsedTime; SResultRec rec; @@ -205,7 +206,7 @@ typedef struct SQInfo { * @param pQInfo * @return */ -int32_t qCreateQueryInfo(SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); /** * query on single table diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 0a5abccdd576710f69419298bc619a806efab143..69ae0c06bd936678e5bf75b16b28638274c784c7 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -14,9 +14,9 @@ */ #include "os.h" -#include "taosmsg.h" #include "hash.h" #include "hashfunc.h" +#include "taosmsg.h" #include "tlog.h" #include "tlosertree.h" #include "tscompression.h" @@ -1482,10 +1482,11 @@ static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool i } } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, bool isSTableQuery) { +static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, SColumnModel *pTagsSchema, int16_t order, + bool isSTableQuery) { dTrace("QInfo:%p setup runtime env", GET_QINFO_ADDR(pRuntimeEnv)); - SQuery* pQuery = pRuntimeEnv->pQuery; - + SQuery *pQuery = pRuntimeEnv->pQuery; + pRuntimeEnv->resultInfo = calloc(pQuery->numOfOutputCols, sizeof(SResultInfo)); pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutputCols, sizeof(SQLFunctionCtx)); @@ -1890,7 +1891,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) { pQuery->checkBufferInLoop = hasMultioutput ? 1 : 0; } - + // pQuery->pointsOffset = pQuery->pointsToRead; } @@ -2242,7 +2243,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { size_t s = taosArrayGetSize(pQInfo->pTableIdList); num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset -// num = pQInfo->pSidSet->numOfSubSet; + // num = pQInfo->pSidSet->numOfSubSet; } assert(num > 0); @@ -2289,7 +2290,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { // tSidSetDestroy(&pQInfo->pSidSet); if (pQInfo->pTableDataInfo != NULL) { -// size_t num = taosHashGetSize(pQInfo->pTableIdList); + // size_t num = taosHashGetSize(pQInfo->pTableIdList); for (int32_t j = 0; j < 0; ++j) { destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); } @@ -2328,7 +2329,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->lastKey = pQuery->window.skey; // create runtime environment -// SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; + // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; // get one queried meter assert(0); @@ -2388,7 +2389,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { taosArrayPush(cols, &pQuery->colList[i]); } - pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(&cond, sa, cols); + pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(NULL, &cond, sa, cols); // metric query do not invoke interpolation, it will be done at the second-stage merge if (!isPointInterpoQuery(pQuery)) { @@ -2409,7 +2410,7 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { */ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { -// assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); + // assert(taosHashGetSize(pQInfo->pTableIdList) >= 1); } #if 0 @@ -2587,23 +2588,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int firstPos, lastPos, midPos = -1; int numOfPoints; TSKEY *keyList; - + if (num <= 0) return -1; - + keyList = (TSKEY *)pValue; firstPos = 0; lastPos = num - 1; - + if (order == 0) { // find the first position which is smaller than the key while (1) { if (key >= keyList[lastPos]) return lastPos; if (key == keyList[firstPos]) return firstPos; if (key < keyList[firstPos]) return firstPos - 1; - + numOfPoints = lastPos - firstPos + 1; midPos = (numOfPoints >> 1) + firstPos; - + if (key < keyList[midPos]) { lastPos = midPos - 1; } else if (key > keyList[midPos]) { @@ -2612,13 +2613,13 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { break; } } - + } else { // find the first position which is bigger than the key while (1) { if (key <= keyList[firstPos]) return firstPos; if (key == keyList[lastPos]) return lastPos; - + if (key > keyList[lastPos]) { lastPos = lastPos + 1; if (lastPos >= num) @@ -2626,10 +2627,10 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { else return lastPos; } - + numOfPoints = lastPos - firstPos + 1; midPos = (numOfPoints >> 1) + firstPos; - + if (key < keyList[midPos]) { lastPos = midPos - 1; } else if (key > keyList[midPos]) { @@ -2639,7 +2640,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { } } } - + return midPos; } @@ -2648,65 +2649,68 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int64_t cnt = 0; dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", - GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); - + GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); + tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; - + while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return cnt; } - + SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle); - + if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) { - TSKEY skey1, ekey1; - STimeWindow w = {0}; - SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; - + TSKEY skey1, ekey1; + STimeWindow w = {0}; + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + if (QUERY_IS_ASC_QUERY(pQuery)) { - doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, &skey1, &ekey1, &w); + doGetAlignedIntervalQueryRangeImpl(pQuery, blockInfo.window.skey, blockInfo.window.skey, pQuery->window.ekey, + &skey1, &ekey1, &w); pWindowResInfo->startTime = w.skey; pWindowResInfo->prevSKey = w.skey; } else { // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp TSKEY winStart = blockInfo.window.ekey - pQuery->intervalTime; - doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, &w); - + doGetAlignedIntervalQueryRangeImpl(pQuery, winStart, pQuery->window.ekey, blockInfo.window.ekey, &skey1, &ekey1, + &w); + pWindowResInfo->startTime = pQuery->window.skey; pWindowResInfo->prevSKey = w.skey; } } - + int32_t numOfRes = 0; - + SDataStatis *pStatis = NULL; - SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); + SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, &blockInfo, &pStatis); int32_t forwardStep = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, &numOfRes, &pRuntimeEnv->windowResInfo, pDataBlock); - -// dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, checked:%d", -// GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, pQueryHandle->cur.slot, -// pQuery->pos, blockInfo.size, forwardStep); - + + // dTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", fileId:%d, slot:%d, pos:%d, rows:%d, + // checked:%d", + // GET_QINFO_ADDR(pQuery), blockInfo.window.skey, blockInfo.window.ekey, pQueryHandle->cur.fileId, + // pQueryHandle->cur.slot, pQuery->pos, blockInfo.size, forwardStep); + // save last access position cnt += forwardStep; if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { break; } } - + // if the result buffer is not full, set the query completed flag if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { setQueryStatus(pQuery, QUERY_COMPLETED); } - + if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { - int32_t step = QUERY_IS_ASC_QUERY(pQuery)? QUERY_ASC_FORWARD_STEP:QUERY_DESC_FORWARD_STEP; - + int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; + closeAllTimeWindow(&pRuntimeEnv->windowResInfo); removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step); pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; @@ -2714,7 +2718,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); } } - + return cnt; } @@ -2976,29 +2980,30 @@ int32_t mergeMetersResultToOneGroups(SQInfo *pQInfo) { int64_t st = taosGetTimestampMs(); int32_t ret = TSDB_CODE_SUCCESS; -// while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { -// int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; -// int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; -// -// assert(0); -// // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); -// if (ret < 0) { // not enough disk space to save the data into disk -// return -1; -// } -// -// pQInfo->subgroupIdx += 1; -// -// // this group generates at least one result, return results -// if (ret > 0) { -// break; -// } -// -// assert(pQInfo->numOfGroupResultPages == 0); -// dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); -// } -// -// dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", GET_QINFO_ADDR(pQuery), -// pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); + // while (pQInfo->subgroupIdx < pQInfo->pSidSet->numOfSubSet) { + // int32_t start = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx]; + // int32_t end = pQInfo->pSidSet->starterPos[pQInfo->subgroupIdx + 1]; + // + // assert(0); + // // ret = doMergeMetersResultsToGroupRes(pQInfo, pQuery, pRuntimeEnv, pQInfo->pTableDataInfo, start, end); + // if (ret < 0) { // not enough disk space to save the data into disk + // return -1; + // } + // + // pQInfo->subgroupIdx += 1; + // + // // this group generates at least one result, return results + // if (ret > 0) { + // break; + // } + // + // assert(pQInfo->numOfGroupResultPages == 0); + // dTrace("QInfo:%p no result in group %d, continue", GET_QINFO_ADDR(pQuery), pQInfo->subgroupIdx - 1); + // } + // + // dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", + // GET_QINFO_ADDR(pQuery), + // pQInfo->subgroupIdx - 1, pQInfo->pSidSet->numOfSubSet, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -3013,10 +3018,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // set current query completed -// if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { - // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; -// return; -// } + // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { + // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; + // return; + // } } SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; @@ -3923,7 +3928,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { -// totalSubset = pQInfo->pSidSet->numOfSubSet; + // totalSubset = pQInfo->pSidSet->numOfSubSet; } return totalSubset; @@ -4303,7 +4308,7 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) { #endif } -int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { +int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param, void* tsdb) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; int32_t code = TSDB_CODE_SUCCESS; @@ -4327,8 +4332,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { pQuery->lastKey = pQuery->window.skey; STsdbQueryCond cond = {0}; - - cond.twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; + + cond.twindow = (STimeWindow){.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; cond.order = pQuery->order.order; cond.colList = *pQuery->colList; @@ -4337,7 +4342,8 @@ int32_t vnodeQueryTablePrepare(SQInfo *pQInfo, void *param) { taosArrayPush(cols, &pQuery->colList[i]); } - pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(&cond, pQInfo->pTableIdList, cols); + + pQInfo->runtimeEnv.pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; pRuntimeEnv->pQuery = pQuery; @@ -5248,41 +5254,42 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { // pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } -void qTableQuery(SQInfo* pQInfo) { +void qTableQuery(SQInfo *pQInfo) { assert(pQInfo != NULL); - + if (pQInfo->killed) { dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } - + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery *pQuery = pRuntimeEnv->pQuery; - -// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); - + SQuery * pQuery = pRuntimeEnv->pQuery; + + // dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); + if (vnodeHasRemainResults(pQInfo)) { /* * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; - + int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); pQuery->rec.pointsRead = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, - (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); - + (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); + doRevisedResultsByLimit(pQInfo); - + pQInfo->pointsInterpo += numOfInterpo; pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - -// dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", -// pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); + + // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", + // pQInfo, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, + // pQInfo->pointsReturned); sem_post(&pQInfo->dataReady); return; } - + // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { // continue to get push data from the group result @@ -5291,66 +5298,68 @@ void qTableQuery(SQInfo* pQInfo) { // todo limit the output for interval query? pQuery->rec.pointsRead = 0; pQInfo->subgroupIdx = 0; // always start from 0 - + if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQInfo->rec.pointsRead += pQuery->rec.pointsRead; - + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); - + if (pQuery->rec.pointsRead > 0) { -// dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", -// pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, -// pQInfo->pointsInterpo, pQInfo->pointsReturned); - + // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d + // totalReturn:%d", + // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, + // pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); + sem_post(&pQInfo->dataReady); return; } } } - -// dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, -// pMeterObj->meterId, pQInfo->pointsRead); - -// vnodePrintQueryStatistics(pSupporter); + + // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, + // pMeterObj->sid, + // pMeterObj->meterId, pQInfo->pointsRead); + + // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); return; } - + // number of points returned during this query pQuery->rec.pointsRead = 0; - + int64_t st = taosGetTimestampUs(); - + // group by normal column, sliding window query, interval query are handled by interval query processor if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) -// assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); + // assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); tableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { assert(pQuery->checkBufferInLoop == 0); - + tableFixedOutputProcessor(pQInfo); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBufferInLoop == 1); tableMultiOutputProcessor(pQInfo); } } - + // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - + /* check if query is killed or not */ if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed", pQInfo); -// pQInfo->over = 1; + // pQInfo->over = 1; } else { -// dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, -// pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); + // dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, + // pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } - + sem_post(&pQInfo->dataReady); -// vnodeDecRefCount(pQInfo); + // vnodeDecRefCount(pQInfo); } void qSuperTableQuery(void *pReadMsg) { @@ -5460,10 +5469,10 @@ static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { return 0; } -static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTableIdList) { +static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList) { pQueryTableMsg->vgId = htons(pQueryTableMsg->vgId); pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); - + pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey); @@ -5483,12 +5492,12 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit); pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset); - + pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset); pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen); pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks); pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder); - + // query msg safety check if (validateQueryMeterMsg(pQueryTableMsg) != 0) { return TSDB_CODE_INVALID_QUERY_MSG; @@ -5590,24 +5599,24 @@ static int32_t convertQueryTableMsg(SQueryTableMsg *pQueryTableMsg, SArray** pTa pQueryTableMsg->colNameList = (int64_t)pMsg; pMsg += pQueryTableMsg->colNameLen; } - + *pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo)); - - STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg; + + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - + taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); - + for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) { pTableIdInfo = (STableIdInfo *)pMsg; - + pTableIdInfo->sid = htonl(pTableIdInfo->sid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - + taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); } @@ -5710,7 +5719,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunctionExpr **pSqlFuncExpr) { *pSqlFuncExpr = NULL; int32_t code = TSDB_CODE_SUCCESS; - + SSqlFunctionExpr *pExprs = (SSqlFunctionExpr *)calloc(1, sizeof(SSqlFunctionExpr) * pQueryMsg->numOfOutputCols); if (pExprs == NULL) { tfree(pQueryMsg->pSqlFuncExprs); @@ -5752,11 +5761,11 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SSqlFunct type = TSDB_DATA_TYPE_DOUBLE; bytes = tDataTypeDesc[type].nSize; - } else { // parse the normal column + } else { // parse the normal column int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].pBase); assert(j < pQueryMsg->numOfCols); - SColumnInfo* pCol = &pQueryMsg->colList[j]; + SColumnInfo *pCol = &pQueryMsg->colList[j]; type = pCol->type; bytes = pCol->bytes; } @@ -5909,7 +5918,8 @@ static int32_t vnodeCreateFilterInfo(void *pQInfo, SQuery *pQuery) { return TSDB_CODE_SUCCESS; } -static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, SArray* pTableIdList) { +static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pExprs, + SArray *pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { goto _clean_memory; @@ -5944,20 +5954,20 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou if (pQuery->colList == NULL) { goto _clean_memory; } - + for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i].info = pQueryMsg->colList[i]; -// SColumnInfo *pColInfo = &pQuery->colList[i].data; -// pColInfo->filters = NULL; -// if (colList[i].numOfFilters > 0) { -// pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); -// -// for (int32_t j = 0; j < colList[i].numOfFilters; ++j) { -// tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]); -// } -// } else { -// pQuery->colList[i].data.filters = NULL; -// } + // SColumnInfo *pColInfo = &pQuery->colList[i].data; + // pColInfo->filters = NULL; + // if (colList[i].numOfFilters > 0) { + // pColInfo->filters = calloc(1, colList[i].numOfFilters * sizeof(SColumnFilterInfo)); + // + // for (int32_t j = 0; j < colList[i].numOfFilters; ++j) { + // tscColumnFilterInfoCopy(&pColInfo->filters[j], &colList[i].filters[j]); + // } + // } else { + // pQuery->colList[i].data.filters = NULL; + // } } // calculate the result row size @@ -6003,9 +6013,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou // to make sure third party won't overwrite this structure pQInfo->signature = (uint64_t)pQInfo; pQInfo->pTableIdList = pTableIdList; - + pQuery->pos = -1; - // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo); + // dTrace("vid:%d sid:%d meterId:%s, QInfo is allocated:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, + // pQInfo); return pQInfo; @@ -6035,7 +6046,7 @@ bool isQInfoValid(void *param) { if (pQInfo == NULL) { return false; } - + /* * pQInfo->signature may be changed by another thread, so we assign value of signature * into local variable, then compare by using local variable @@ -6044,75 +6055,75 @@ bool isQInfoValid(void *param) { return (sig == (uint64_t)pQInfo); } -void vnodeFreeQInfo(SQInfo* pQInfo, bool decQueryRef) { +void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { if (!isQInfoValid(pQInfo)) { return; } - + pQInfo->killed = 1; dTrace("QInfo:%p start to free SQInfo", pQInfo); - + if (decQueryRef) { vnodeDecMeterRefcnt(pQInfo); } - + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - + for (int col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } - -// for (int col = 0; col < pQuery->numOfCols; ++col) { -// vnodeFreeColumnInfo(&pQuery->colList[col].data); -// } -// -// if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { -// tfree(pQuery->tsData); -// } - + + // for (int col = 0; col < pQuery->numOfCols; ++col) { + // vnodeFreeColumnInfo(&pQuery->colList[col].data); + // } + // + // if (pQuery->colList[0].colIdx != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + // tfree(pQuery->tsData); + // } + sem_destroy(&(pQInfo->dataReady)); vnodeQueryFreeQInfoEx(pQInfo); - + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pColFilter->numOfFilters > 0) { tfree(pColFilter->pFilters); } } - + tfree(pQuery->pFilterInfo); tfree(pQuery->colList); tfree(pQuery->sdata); - + if (pQuery->pSelectExpr != NULL) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { SSqlBinaryExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo; - + if (pBinExprInfo->numOfCols > 0) { tfree(pBinExprInfo->pReqColumns); tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); } } - + tfree(pQuery->pSelectExpr); } - + if (pQuery->defaultVal != NULL) { tfree(pQuery->defaultVal); } - + tfree(pQuery->pGroupbyExpr); tfree(pQuery); - -// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); - - //destroy signature, in order to avoid the query process pass the object safety check + + // dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); + + // destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); tfree(pQInfo); } static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, - SArray* pTableIdList, SQInfo **pQInfo) { + SArray *pTableIdList, void* tsdb, SQInfo **pQInfo) { int32_t code = TSDB_CODE_SUCCESS; (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pSqlExprs, pTableIdList); @@ -6121,16 +6132,12 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE goto _error; } - SQuery* pQuery = (*pQInfo)->runtimeEnv.pQuery; + SQuery *pQuery = (*pQInfo)->runtimeEnv.pQuery; dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); -// STableIdInfo **pTableIdList = (STableIdInfo **)pQueryMsg->pSidExtInfo; -// if (pTableIdList != NULL && pTableIdList[0]->key > 0) { -// pQuery->window.skey = pTableIdList[0]->key; -// } else { pQuery->window.skey = pQueryMsg->window.skey; pQuery->window.ekey = pQueryMsg->window.ekey; - + pQuery->lastKey = pQuery->window.skey; if (sem_init(&(*pQInfo)->dataReady, 0, 0) != 0) { @@ -6151,7 +6158,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE tsBufNextPos(pTSBuf); } - if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf)) != TSDB_CODE_SUCCESS) { + if ((code = vnodeQueryTablePrepare(*pQInfo, pTSBuf, tsdb)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -6163,18 +6170,18 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE // dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, // pQInfo->refCount); return code; - + _error: // table query ref will be decrease during error handling vnodeFreeQInfo(*pQInfo, false); return code; } -int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; - SArray* pTableIdList = NULL; + SArray *pTableIdList = NULL; if ((code = convertQueryTableMsg(pQueryTableMsg, &pTableIdList)) != TSDB_CODE_SUCCESS) { return code; } @@ -6205,14 +6212,14 @@ int32_t qCreateQueryInfo(SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) { // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { - code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, pQInfo); + code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); } _query_over: if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pTableIdList); } - + // if failed to add ref for all meters in this query, abort current query // if (code != TSDB_CODE_SUCCESS) { // vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber); @@ -6231,31 +6238,31 @@ _query_over: return TSDB_CODE_SUCCESS; } -int32_t qRetrieveQueryResultInfo(SQInfo* pQInfo, int32_t *numOfRows, int32_t* rowsize) { +int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *rowsize) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } - + if (pQInfo->killed) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; - } else { // in case of not TSDB_CODE_SUCCESS, return the code to client + } else { // in case of not TSDB_CODE_SUCCESS, return the code to client return abs(pQInfo->code); } } - + sem_wait(&pQInfo->dataReady); - - SQuery* pQuery = pQInfo->runtimeEnv.pQuery; - -// *numOfRows = pQInfo->rec.pointsRead; -// *rowsize = pQuery->rowSize; + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + + // *numOfRows = pQInfo->rec.pointsRead; + // *rowsize = pQuery->rowSize; *numOfRows = 1; - -// dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, -// *rowsize, *numOfRows, pQInfo->code); - + + // dTrace("QInfo:%p, retrieve data info completed, precision:%d, rowsize:%d, rows:%d, code:%d", pQInfo, *timePrec, + // *rowsize, *numOfRows, pQInfo->code); + if (pQInfo->code < 0) { // less than 0 means there are error existed. return -pQInfo->code; } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 85b1d3d206f07647c2fd68c9e61e7f8d7367f290..c46f45fd37178147c7bb2903462290f861ac6189 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -15,7 +15,6 @@ #include "os.h" #include "tlog.h" -// #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" @@ -395,6 +394,7 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { SSkipListNode *px = pSkipList->pHead; SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0}; + bool identical = false; for (int32_t i = pSkipList->level - 1; i >= 0; --i) { SSkipListNode *p = SL_GET_FORWARD_POINTER(px, i); while (p != NULL) { @@ -402,11 +402,16 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { char *newDatakey = SL_GET_NODE_KEY(pSkipList, pNode); // if the forward element is less than the specified key, forward one step - if (pSkipList->comparFn(key, newDatakey) < 0) { + int32_t ret = pSkipList->comparFn(key, newDatakey); + if (ret < 0) { px = p; p = SL_GET_FORWARD_POINTER(px, i); } else { + if (identical == false) { + identical = (ret == 0); + } + break; } } @@ -418,17 +423,12 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode) { } // if the skip list does not allowed identical key inserted, the new data will be discarded. - if (pSkipList->keyInfo.dupKey == 0 && forward[0] != pSkipList->pHead) { - char *key = SL_GET_NODE_KEY(pSkipList, forward[0]); - char *pNewDataKey = SL_GET_NODE_KEY(pSkipList, pNode); - - if (pSkipList->comparFn(key, pNewDataKey) == 0) { - if (pSkipList->lock) { - pthread_rwlock_unlock(pSkipList->lock); - } - - return forward[0]; + if (pSkipList->keyInfo.dupKey == 0 && identical) { + if (pSkipList->lock) { + pthread_rwlock_unlock(pSkipList->lock); } + + return forward[0]; } #if SKIP_LIST_RECORD_PERFORMANCE diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index 1368515cfdf6b85716173611a45d513e134aa730..92d8ad757b864062f459f92f7fcb4aa7fedebb08 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -90,15 +90,6 @@ int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg); int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId); int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg); -// Submit message for one table -typedef struct { - STableId tableId; - int32_t padding; // TODO just for padding here - int32_t sversion; // data schema version - int32_t len; // data part length, not including the SSubmitBlk head - char data[]; -} SSubmitBlk; - typedef struct { int32_t totalLen; int32_t len; @@ -108,15 +99,10 @@ typedef struct { int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); -// Submit message for this TSDB -typedef struct { - int32_t length; - int32_t compressed; - SSubmitBlk blocks[]; -} SSubmitMsg; - #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) +struct STsdbRepo; + // SSubmitMsg Iterator typedef struct { int32_t totalLen; @@ -245,7 +231,7 @@ typedef void *tsdbpos_t; * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); +tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo); /** * move to next block diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 2324ead45163ee1bb467f0ae98279fd6587918ea..7f3acb66249570c015dfb0bf31106907d8c047a5 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -63,6 +63,26 @@ typedef struct { SFileGroup fGroup[]; } STsdbFileH; +/** + * if numOfSubBlocks == -1, then the SCompBlock is a sub-block + * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to + * the data block offset and length + * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the + * binary + */ +typedef struct { + int64_t last : 1; // If the block in data file or last file + int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks + int32_t algorithm : 8; // Compression algorithm + int32_t numOfPoints : 24; // Number of total points + int32_t sversion; // Schema version + int32_t len; // Data block length or nothing + int16_t numOfSubBlocks; // Number of sub-blocks; + int16_t numOfCols; + TSKEY keyFirst; + TSKEY keyLast; +} SCompBlock; + #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index aba506f30d85816f06192b48a5158f0cbb604a21..059ec87e914bbcfd416a3b160dfdeabe5e911065 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -44,7 +44,7 @@ typedef struct { typedef struct STable { int8_t type; STableId tableId; - int32_t superUid; // Super table UID + int64_t superUid; // Super table UID int32_t sversion; STSchema * schema; STSchema * tagSchema; @@ -98,6 +98,8 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta); #define TSDB_TABLE_OF_ID(pHandle, id) ((pHandle)->pTables)[id] #define TSDB_GET_TABLE_OF_NAME(pHandle, name) /* TODO */ +STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo); + int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 1c91c03b44ae28aed3a043e2410a5ca2ea79d990..98562be0cce5e057049c20d4f586453522908750 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -33,26 +33,6 @@ typedef struct { int64_t offset; } SCompIdx; -/** - * if numOfSubBlocks == -1, then the SCompBlock is a sub-block - * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to - * the data block offset and length - * if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the - * binary - */ -typedef struct { - int64_t last : 1; // If the block in data file or last file - int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks - int32_t algorithm : 8; // Compression algorithm - int32_t numOfPoints : 24; // Number of total points - int32_t sversion; // Schema version - int32_t len; // Data block length or nothing - int16_t numOfSubBlocks; // Number of sub-blocks; - int16_t numOfCols; - TSKEY keyFirst; - TSKEY keyLast; -} SCompBlock; - typedef struct { int32_t delimiter; // For recovery usage int32_t checksum; // TODO: decide if checksum logic in this file or make it one API diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index a8a80dd1641ac0939187ec9abd24fb24ddef0246..b07cce1b3bee1371884e7bf49dab2f2f3332158e 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -499,6 +499,10 @@ SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL || pIter == NULL) return -1; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + pMsg->compressed = htonl(pMsg->compressed); + pIter->totalLen = pMsg->length; pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { @@ -513,7 +517,15 @@ int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { SSubmitBlk *pBlock = pIter->pBlock; if (pBlock == NULL) return NULL; - + + pBlock->len = htonl(pBlock->len); + pBlock->numOfRows = htons(pBlock->numOfRows); + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + + pBlock->sversion = htonl(pBlock->sversion); + pBlock->padding = htonl(pBlock->padding); + pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->len; if (pIter->len >= pIter->totalLen) { pIter->pBlock = NULL; @@ -524,6 +536,11 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { return pBlock; } +STsdbMeta* tsdbGetMeta(tsdb_repo_t* pRepo) { + STsdbRepo *tsdb = (STsdbRepo *)pRepo; + return tsdb->tsdbMeta; +} + // Check the configuration and set default options static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // Check precision @@ -695,6 +712,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); TSKEY key = dataRowKey(row); + printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); + // Copy row into the memory SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); if (pNode == NULL) { @@ -715,7 +734,9 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable tSkipListPut(pTable->mem->pData, pNode); if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; - pTable->mem->numOfPoints++; + + pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); +// pTable->mem->numOfPoints++; return 0; } @@ -723,7 +744,8 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { STsdbRepo *pRepo = (STsdbRepo *)repo; - STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, pBlock->tableId); + STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; + STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); if (pTable == NULL) return -1; SSubmitBlkIter blkIter; diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index a62299c45fa83b7e021e2e91863e10eea9126046..d654308f9956dc34d92ba902ac9d0c087f608b6d 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -14,53 +14,378 @@ */ #include "os.h" +#include "tutil.h" + #include "tsdb.h" +#include "tsdbFile.h" +#include "tsdbMeta.h" -tsdb_query_handle_t *tsdbQueryByTableId(STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { +#define EXTRA_BYTES 2 +#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoEx *)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) +#define QUERY_IS_ASC_QUERY(o) (o == TSQL_SO_ASC) +#define QH_GET_NUM_OF_COLS(handle) (taosArrayGetSize((handle)->pColumns)) -} +typedef struct SField { + // todo need the definition +} SField; -bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { - return false; -} +typedef struct SHeaderFileInfo { + int32_t fileId; +} SHeaderFileInfo; -SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { +typedef struct SQueryHandlePos { + int32_t fileId; + int32_t slot; + int32_t pos; + int32_t fileIndex; +} SQueryHandlePos; -} +typedef struct SDataBlockLoadInfo { + int32_t fileListIndex; + int32_t fileId; + int32_t slotIdx; + int32_t sid; + SArray *pLoadedCols; +} SDataBlockLoadInfo; -int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) { +typedef struct SLoadCompBlockInfo { + int32_t sid; /* meter sid */ + int32_t fileId; + int32_t fileListIndex; +} SLoadCompBlockInfo; -} +typedef struct SQueryFilesInfo { + SArray *pFileInfo; + int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. + int32_t vnodeId; -SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { + int32_t headerFd; // header file fd + int64_t headerFileSize; + int32_t dataFd; + int32_t lastFd; + + char headerFilePath[PATH_MAX]; // current opened header file name + char dataFilePath[PATH_MAX]; // current opened data file name + char lastFilePath[PATH_MAX]; // current opened last file path + char dbFilePathPrefix[PATH_MAX]; +} SQueryFilesInfo; + +typedef struct STableQueryRec { + TSKEY lastKey; + STable * pTableObj; + int64_t offsetInHeaderFile; + int32_t numOfBlocks; + int32_t start; + SCompBlock *pBlock; +} STableQueryRec; + +typedef struct { + SCompBlock *compBlock; + SField * fields; +} SCompBlockFields; + +typedef struct STableDataBlockInfoEx { + SCompBlockFields pBlock; + STableQueryRec * pMeterDataInfo; + int32_t blockIndex; + int32_t groupIdx; /* number of group is less than the total number of meters */ +} STableDataBlockInfoEx; + +typedef struct STsdbQueryHandle { + struct STsdbRepo* pTsdb; + + SQueryHandlePos cur; // current position + SQueryHandlePos start; // the start position, used for secondary/third iteration + int32_t unzipBufSize; + char *unzipBuffer; + char *secondaryUnzipBuffer; + + SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ + SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ + + SQueryFilesInfo vnodeFileInfo; + + int16_t numOfRowsPerPage; + uint16_t flag; // denotes reversed scan of data or not + int16_t order; + STimeWindow window; // the primary query time window that applies to all queries + TSKEY lastKey; + int32_t blockBufferSize; + SCompBlock *pBlock; + int32_t numOfBlocks; + SField ** pFields; + SArray * pColumns; // column list, SColumnInfoEx array list + SArray * pTableIdList; // table id object list + bool locateStart; + int32_t realNumOfRows; + bool loadDataAfterSeek; // load data after seek. + + STableDataBlockInfoEx *pDataBlockInfoEx; + STableQueryRec * pTableQueryInfo; + int32_t tableIndex; + bool isFirstSlot; + void * qinfo; // query info handle, for debug purpose +} STsdbQueryHandle; + +int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { + // record the maximum column width among columns of this meter/metric + SColumnInfoEx *pColumn = taosArrayGet(pQueryHandle->pColumns, 0); + + int32_t maxColWidth = pColumn->info.bytes; + for (int32_t i = 1; i < QH_GET_NUM_OF_COLS(pQueryHandle); ++i) { + int32_t bytes = pColumn[i].info.bytes; + if (bytes > maxColWidth) { + maxColWidth = bytes; + } + } + + // only one unzip buffer required, since we can unzip each column one by one + pQueryHandle->unzipBufSize = (size_t)(maxColWidth * rowsPerFileBlock + EXTRA_BYTES); // plus extra_bytes + pQueryHandle->unzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize); + + pQueryHandle->secondaryUnzipBuffer = (char *)calloc(1, pQueryHandle->unzipBufSize); + + if (pQueryHandle->unzipBuffer == NULL || pQueryHandle->secondaryUnzipBuffer == NULL) { + goto _error_clean; + } + + return TSDB_CODE_SUCCESS; +_error_clean: + tfree(pQueryHandle->unzipBuffer); + tfree(pQueryHandle->secondaryUnzipBuffer); + + return TSDB_CODE_SERV_OUT_OF_MEMORY; } -int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow* window, tsdbpos_t position, int16_t order) { +static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { + pVnodeFilesInfo->current = -1; + pVnodeFilesInfo->headerFileSize = -1; + + pVnodeFilesInfo->headerFd = FD_INITIALIZER; // set the initial value + pVnodeFilesInfo->dataFd = FD_INITIALIZER; + pVnodeFilesInfo->lastFd = FD_INITIALIZER; +} +static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) { + pBlockLoadInfo->slotIdx = -1; + pBlockLoadInfo->fileId = -1; + pBlockLoadInfo->sid = -1; + pBlockLoadInfo->fileListIndex = -1; } -int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) { +static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { + pCompBlockLoadInfo->sid = -1; + pCompBlockLoadInfo->fileId = -1; + pCompBlockLoadInfo->fileListIndex = -1; +} +static int fileOrderComparFn(const void *p1, const void *p2) { + SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1; + SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2; + + if (pInfo1->fileId == pInfo2->fileId) { + return 0; + } + + return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1; } -tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { - return NULL; +void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) { + char suffix[] = ".head"; + pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t)); + + struct dirent *pEntry = NULL; + pVnodeFilesInfo->vnodeId = vnodeId; + char* tsDirectory = ""; + + sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); + DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); + if (pDir == NULL) { + // dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix, + // strerror(errno)); + return; + } + + while ((pEntry = readdir(pDir)) != NULL) { + if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) { + continue; + } + + if (pEntry->d_type & DT_DIR) { + continue; + } + + size_t len = strlen(pEntry->d_name); + if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) { + continue; + } + + int32_t vid = 0; + int32_t fid = 0; + sscanf(pEntry->d_name, "v%df%d", &vid, &fid); + if (vid != vnodeId) { /* ignore error files */ + // dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId); + continue; + } + +// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1; +// if (fid > pVnode->fileId || fid < firstFid) { +// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId, +// fid, firstFid, pVnode->fileId); +// continue; +// } + + assert(fid >= 0 && vid >= 0); + taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid); + } + + closedir(pDir); + + // dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, + // pVnodeFilesInfo->dbFilePathPrefix); + + // order the files information according their names */ + size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo); + qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn); } -SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) { +tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { + // todo 1. filter not exist table + + // todo 2. add the reference count for each table that is involved in query + + STsdbQueryHandle *pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); + pQueryHandle->order = pCond->order; + pQueryHandle->window = pCond->twindow; + pQueryHandle->pTsdb = tsdb; + + pQueryHandle->pTableIdList = idList; + pQueryHandle->pColumns = pColumnInfo; + pQueryHandle->loadDataAfterSeek = false; + pQueryHandle->isFirstSlot = true; + pQueryHandle->lastKey = pQueryHandle->window.skey; // ascending query + + // malloc buffer in order to load data from file + int32_t numOfCols = taosArrayGetSize(pColumnInfo); + size_t bufferCapacity = 4096; + + pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoEx)); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoEx *pCol = taosArrayGet(pColumnInfo, i); + SColumnInfoEx pDest = {{0}, 0}; + + pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes); + pDest.info = pCol->info; + taosArrayPush(pQueryHandle->pColumns, &pDest); + } + + if (doAllocateBuf(pQueryHandle, bufferCapacity) != TSDB_CODE_SUCCESS) { + return NULL; + } + + initQueryFileInfoFD(&pQueryHandle->vnodeFileInfo); + vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); + vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); + + int32_t vnodeId = 1; + vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo); + + return (tsdb_query_handle_t)pQueryHandle; } -tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) { +static int32_t next = 1; +bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { + if (next == 0) { + return false; + } else { + next = 0; + return true; + } +} +static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, + TSKEY* skey, TSKEY* ekey, STsdbQueryHandle* pHandle) { + int numOfRows = 0; + int32_t numOfCols = taosArrayGetSize(pHandle->pColumns); + *skey = INT64_MIN; + + while(tSkipListIterNext(pIter)) { + SSkipListNode *node = tSkipListIterGet(pIter); + if (node == NULL) break; + + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) > maxKey) break; + // Convert row data to column data + + if (*skey == INT64_MIN) { + *skey = dataRowKey(row); + } + + *ekey = dataRowKey(row); + + int32_t offset = 0; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoEx* pColInfo = taosArrayGet(pHandle->pColumns, 0); + memcpy(pColInfo->pData + numOfRows*pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes); + offset += pColInfo->info.bytes; + } + + numOfRows++; + if (numOfRows > maxRowsToRead) break; + }; + + return numOfRows; } -STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) { +// copy data from cache into data block +SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { + STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; + STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0); + + STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; + STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pHandle->pTsdb), tableId); + assert(pTable != NULL); + + TSKEY skey = 0, ekey = 0; + int32_t rows = 0; + + if (pTable->mem != NULL) { + SSkipListIterator* iter = tSkipListCreateIter(pTable->mem->pData); + rows = tsdbReadRowsFromCache(iter, INT64_MAX, 4000, &skey, &ekey, pHandle); + } + + SDataBlockInfo blockInfo = { + .uid = tableId.uid, + .sid = tableId.tid, + .size = rows, + .window = {.skey = skey, .ekey = ekey} + }; + + return blockInfo; +} +// return null for data block in cache +int32_t tsdbRetrieveDataBlockStatisInfo(tsdb_query_handle_t *pQueryHandle, SDataStatis **pBlockStatis) { + *pBlockStatis = NULL; + return TSDB_CODE_SUCCESS; } -STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) { +SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList) { } +int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order) {} + +int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos) {} + +tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle) { return NULL; } + +SArray *tsdbRetrieveDataRow(tsdb_query_handle_t *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond) {} + +tsdb_query_handle_t *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr) {} + +STableIDList *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle) {} + +STableIDList *tsdbQueryTableList(int16_t stableId, const char *pTagCond) {}