提交 f851d0c4 编写于 作者: H Haojun Liao

[td-225] update the api in query process functions. remove the tsrpcheadsize...

[td-225] update the api in query process functions. remove the tsrpcheadsize in msg to vnode.refactor some codes.
上级 6be31856
...@@ -89,27 +89,21 @@ typedef struct SVgroupTableInfo { ...@@ -89,27 +89,21 @@ typedef struct SVgroupTableInfo {
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks); STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset); uint32_t offset);
SDataBlockList* tscCreateBlockArrayList(); void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SArray* pDataBlockList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks); STableDataBlocks** dataBlocks);
//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/** /**
*
* for the projection query on metric or point interpolation query on metric, * for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters simultaneously. * we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
* *
......
...@@ -183,11 +183,11 @@ typedef struct STableDataBlocks { ...@@ -183,11 +183,11 @@ typedef struct STableDataBlocks {
SParamInfo *params; SParamInfo *params;
} STableDataBlocks; } STableDataBlocks;
typedef struct SDataBlockList { // todo remove //typedef struct SDataBlockList { // todo remove
uint32_t nSize; // uint32_t nSize;
uint32_t nAlloc; // uint32_t nAlloc;
STableDataBlocks **pData; // STableDataBlocks **pData;
} SDataBlockList; //} SDataBlockList;
typedef struct SQueryInfo { typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately. int16_t command; // the command may be different for each subclause, so keep it seperately.
...@@ -238,8 +238,7 @@ typedef struct { ...@@ -238,8 +238,7 @@ typedef struct {
void * pTableList; // referred table involved in sql void * pTableList; // referred table involved in sql
int32_t batchSize; // for parameter ('?') binding and batch processing int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams; int32_t numOfParams;
SArray *pDataBlocks; // SArray<STableDataBlocks*> submit data blocks after parsing sql
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
} SSqlCmd; } SSqlCmd;
typedef struct SResRec { typedef struct SResRec {
......
...@@ -1038,8 +1038,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1038,8 +1038,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (NULL == pCmd->pTableList) { if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
...@@ -1170,7 +1169,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1170,7 +1169,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _error; goto _error;
} }
tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); taosArrayPush(pCmd->pDataBlocks, &pDataBlock);
strcpy(pDataBlock->filename, fname); strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) { } else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
...@@ -1258,7 +1257,7 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1258,7 +1257,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean; goto _clean;
} }
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1368,8 +1367,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1368,8 +1367,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code; return code;
} }
// the pDataBlock is different from the pTableDataBlocks STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0];
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1400,15 +1398,15 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1400,15 +1398,15 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int32_t rowSize = tinfo.rowSize; int32_t rowSize = tinfo.rowSize;
pCmd->pDataBlocks = tscCreateBlockArrayList(); pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
STableDataBlocks *pTableDataBlock = NULL; STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk),
pTableMetaInfo->name, pTableMeta, &pTableDataBlock); int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return -1; return ret;
} }
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows); code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
if (TSDB_CODE_SUCCESS != code) return -1; if (TSDB_CODE_SUCCESS != code) return -1;
...@@ -1442,7 +1440,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1442,7 +1440,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
return -code; return -code;
} }
pTableDataBlock = pCmd->pDataBlocks->pData[0]; pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
pTableDataBlock->size = sizeof(SSubmitBlk); pTableDataBlock->size = sizeof(SSubmitBlk);
pTableDataBlock->rowSize = tinfo.rowSize; pTableDataBlock->rowSize = tinfo.rowSize;
...@@ -1479,13 +1477,14 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { ...@@ -1479,13 +1477,14 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
int32_t affected_rows = 0; int32_t affected_rows = 0;
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL); assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
SDataBlockList *pDataBlockList = pCmd->pDataBlocks; SArray *pDataBlockList = pCmd->pDataBlocks;
pCmd->pDataBlocks = NULL; pCmd->pDataBlocks = NULL;
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
for (int32_t i = 0; i < pDataBlockList->nSize; ++i) { size_t size = taosArrayGetSize(pDataBlockList);
pDataBlock = pDataBlockList->pData[i]; for (int32_t i = 0; i < size; ++i) {
pDataBlock = taosArrayGetP(pDataBlockList, i );
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
continue; continue;
} }
......
...@@ -331,8 +331,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { ...@@ -331,8 +331,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
binded = pCmd->batchSize / 2; binded = pCmd->batchSize / 2;
} }
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { size_t size = taosArrayGetSize(pCmd->pDataBlocks);
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; for (int32_t i = 0; i < size; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
uint32_t dataSize = totalDataSize / alloced; uint32_t dataSize = totalDataSize / alloced;
assert(dataSize * alloced == totalDataSize); assert(dataSize * alloced == totalDataSize);
...@@ -370,8 +371,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { ...@@ -370,8 +371,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { size_t total = taosArrayGetSize(pCmd->pDataBlocks);
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size += totalDataSize / alloced; pBlock->size += totalDataSize / alloced;
...@@ -395,8 +397,10 @@ static int insertStmtReset(STscStmt* pStmt) { ...@@ -395,8 +397,10 @@ static int insertStmtReset(STscStmt* pStmt) {
SSqlCmd* pCmd = &pStmt->pSql->cmd; SSqlCmd* pCmd = &pStmt->pSql->cmd;
if (pCmd->batchSize > 2) { if (pCmd->batchSize > 2) {
int32_t alloced = (pCmd->batchSize + 1) / 2; int32_t alloced = (pCmd->batchSize + 1) / 2;
for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) {
STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; size_t size = taosArrayGetSize(pCmd->pDataBlocks);
for (int32_t i = 0; i < size; ++i) {
STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i);
uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk);
pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced; pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced;
...@@ -423,15 +427,15 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -423,15 +427,15 @@ static int insertStmtExecute(STscStmt* stmt) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
if (pCmd->pDataBlocks->nSize > 0) { if (taosArrayGetSize(pCmd->pDataBlocks) > 0) {
// merge according to vgid // merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock); code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -4463,7 +4463,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -4463,7 +4463,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->tid = htonl(pTableMeta->sid);
pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->uid = htobe64(pTableMeta->uid);
......
...@@ -176,18 +176,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -176,18 +176,16 @@ int tscSendMsgToServer(SSqlObj *pSql) {
char *pMsg = rpcMallocCont(pCmd->payloadLen); char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (pSql->cmd.command < TSDB_SQL_MGMT) { // set the mgmt ip list
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); if (pSql->cmd.command >= TSDB_SQL_MGMT) {
} else {
pSql->ipList = tscMgmtIpSet; pSql->ipList = tscMgmtIpSet;
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
} }
// tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType, .msgType = pSql->cmd.msgType,
...@@ -222,8 +220,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { ...@@ -222,8 +220,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pObj, pObj->signature); pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -449,18 +447,11 @@ void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -449,18 +447,11 @@ void tscKillSTableQuery(SSqlObj *pSql) {
} }
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart; SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pStart = pSql->cmd.payload + tsRpcHeadSize;
pMsg = pStart;
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pMsg += sizeof(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pRetrieveMsg->free = htons(pQueryInfo->type); pRetrieveMsg->free = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type);
// todo valid the vgroupId at the client side // todo valid the vgroupId at the client side
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -474,12 +465,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -474,12 +465,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId);
} }
pMsg += sizeof(SRetrieveTableMsg); pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg);
pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen);
pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH; pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH;
pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -487,19 +478,18 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -487,19 +478,18 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload + tsRpcHeadSize; char* pMsg = pSql->cmd.payload;
// NOTE: shell message size should not include SMsgDesc // NOTE: shell message size should not include SMsgDesc
int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc);
int32_t vgId = pTableMeta->vgroupInfo.vgId;
SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg;
pMsgDesc->numOfVnodes = htonl(1); // always one vnode
pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes
pMsg += sizeof(SMsgDesc); pMsg += sizeof(SMsgDesc);
SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg;
int32_t vgId = pTableMeta->vgroupInfo.vgId;
pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.vgId = htonl(vgId);
pShellMsg->header.contLen = htonl(size); pShellMsg->header.contLen = htonl(size);
pShellMsg->length = pShellMsg->header.contLen; pShellMsg->length = pShellMsg->header.contLen;
...@@ -510,7 +500,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -510,7 +500,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo);
tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps); tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes),
pSql->ipList.numOfIps);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -620,9 +611,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -620,9 +611,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return -1; return -1;
} }
char *pStart = pCmd->payload + tsRpcHeadSize; SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart;
int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList);
...@@ -821,7 +810,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -821,7 +810,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
} }
// compressed ts block // compressed ts block
pQueryMsg->tsOffset = htonl(pMsg - pStart); pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload);
int32_t tsLen = 0; int32_t tsLen = 0;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
...@@ -844,7 +833,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -844,7 +833,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
} }
int32_t msgLen = pMsg - pStart; int32_t msgLen = pMsg - pCmd->payload;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
pCmd->payloadLen = msgLen; pCmd->payloadLen = msgLen;
...@@ -1286,10 +1275,12 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { ...@@ -1286,10 +1275,12 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL; pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL;
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1552,150 +1543,6 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1552,150 +1543,6 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
//} //}
int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
#if 0
SSuperTableMetaMsg *pMetaMsg;
char * pMsg, *pStart;
int msgLen = 0;
int tableIndex = 0;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STagCond *pTagCond = &pQueryInfo->tagCond;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
int32_t size = tscEstimateMetricMetaMsgSize(pCmd);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for metric meter msg", pSql);
return -1;
}
pStart = pCmd->payload + tsRpcHeadSize;
pMsg = pStart;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db);
pMsg += sizeof(SMgmtHead);
pMetaMsg = (SSuperTableMetaMsg *)pMsg;
pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables);
pMsg += sizeof(SSuperTableMetaMsg);
int32_t offset = pMsg - (char *)pMetaMsg;
pMetaMsg->join = htonl(offset);
// todo refactor
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
pMsg += sizeof(int16_t);
memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN);
pMsg += TSDB_TABLE_ID_LEN;
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
pMsg += sizeof(int16_t);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i);
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
offset = pMsg - (char *)pMetaMsg;
pMetaMsg->metaElem[i] = htonl(offset);
SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg;
pMsg += sizeof(SSuperTableMetaElemMsg);
// convert to unicode before sending to mnode for metric query
int32_t condLen = 0;
if (pTagCond->numOfTagCond > 0) {
SCond *pCond = tsGetSTableQueryCond(pTagCond, uid);
if (pCond != NULL && pCond->cond != NULL) {
condLen = strlen(pCond->cond) + 1;
bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE);
if (!ret) {
tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid));
return 0;
}
}
}
pElem->condLen = htonl(condLen);
offset = pMsg - (char *)pMetaMsg;
pElem->cond = htonl(offset);
pMsg += condLen * TSDB_NCHAR_SIZE;
pElem->rel = htons(pTagCond->relType);
if (pTagCond->tbnameCond.uid == uid) {
offset = pMsg - (char *)pMetaMsg;
pElem->tableCond = htonl(offset);
uint32_t len = 0;
if (pTagCond->tbnameCond.cond != NULL) {
len = strlen(pTagCond->tbnameCond.cond);
memcpy(pMsg, pTagCond->tbnameCond.cond, len);
}
pElem->tableCondLen = htonl(len);
pMsg += len;
}
SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr;
if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) {
pElem->orderType = 0;
pElem->orderIndex = 0;
pElem->numOfGroupCols = 0;
} else {
pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols);
for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) {
pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]);
}
if (pGroupby->numOfGroupCols != 0) {
pElem->orderIndex = htons(pGroupby->orderIndex);
pElem->orderType = htons(pGroupby->orderType);
offset = pMsg - (char *)pMetaMsg;
pElem->groupbyTagColumnList = htonl(offset);
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j];
SColIndex *pDestCol = (SColIndex *)pMsg;
pDestCol->colIdxInBuf = 0;
pDestCol->colIndex = htons(pCol->colIndex);
pDestCol->colId = htons(pDestCol->colId);
pDestCol->flag = htons(pDestCol->flag);
strncpy(pDestCol->name, pCol->name, tListLen(pCol->name));
pMsg += sizeof(SColIndex);
}
}
}
strcpy(pElem->tableId, pTableMetaInfo->name);
pElem->numOfTags = htons(pTableMetaInfo->numOfTags);
int16_t len = pMsg - (char *)pElem;
pElem->elemLen = htons(len); // redundant data for integrate check
}
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP;
assert(msgLen + minMsgSize() <= size);
#endif
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
char* pMsg = pCmd->payload; char* pMsg = pCmd->payload;
...@@ -1795,7 +1642,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1795,7 +1642,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
for (int i = 0; i < numOfTotalCols; ++i) { for (int i = 0; i < numOfTotalCols; ++i) {
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htons(pSchema->bytes);
pSchema->colId = htons(pSchema->colId); pSchema->colId = htons(pSchema->colId);
if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
assert(i == 0);
}
assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR); assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR);
pSchema++; pSchema++;
} }
......
...@@ -134,7 +134,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con ...@@ -134,7 +134,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
} }
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pSql; return pSql;
} }
......
...@@ -1864,7 +1864,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -1864,7 +1864,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->res.code = pSql->res.code; pParentObj->res.code = pSql->res.code;
} }
// it is not the initial sqlObj, free it
taos_free_result(tres); taos_free_result(tres);
tfree(pSupporter); tfree(pSupporter);
...@@ -1889,12 +1888,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -1889,12 +1888,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SDataBlockList *pDataBlocks = pCmd->pDataBlocks; size_t size = taosArrayGetSize(pCmd->pDataBlocks);
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); assert(size > 0);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0); pSql->pSubs = calloc(size, POINTER_BYTES);
pSql->numOfSubs = size;
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
tscTrace("%p submit data to %zu vnode(s)", pSql, size);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs; pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs; pState->numOfRemain = pSql->numOfSubs;
...@@ -1920,12 +1921,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { ...@@ -1920,12 +1921,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pNew->fetchFp = pNew->fp; pNew->fetchFp = pNew->fp;
pSql->pSubs[numOfSub] = pNew; pSql->pSubs[numOfSub] = pNew;
pRes->code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub++]); STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub);
pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock);
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub); tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub);
numOfSub++;
} else { } else {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub, tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%zu, code:%s", pSql, numOfSub,
pDataBlocks->nSize, tstrerror(pRes->code)); size, tstrerror(pRes->code));
goto _error; goto _error;
} }
} }
......
...@@ -13,8 +13,6 @@ ...@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tscUtil.h"
#include "hash.h"
#include "os.h" #include "os.h"
#include "qast.h" #include "qast.h"
#include "taosmsg.h" #include "taosmsg.h"
...@@ -29,6 +27,8 @@ ...@@ -29,6 +27,8 @@
#include "ttimer.h" #include "ttimer.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscUtil.h"
#include "hash.h"
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
...@@ -428,48 +428,18 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -428,48 +428,18 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
return param; return param;
} }
SDataBlockList* tscCreateBlockArrayList() { void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
const int32_t DEFAULT_INITIAL_NUM_OF_BLOCK = 16; if (pDataBlockList == NULL) {
SDataBlockList* pDataBlockArrayList = calloc(1, sizeof(SDataBlockList));
if (pDataBlockArrayList == NULL) {
return NULL; return NULL;
} }
pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK;
pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc);
if (pDataBlockArrayList->pData == NULL) {
free(pDataBlockArrayList);
return NULL;
}
return pDataBlockArrayList;
}
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) {
if (pList->nSize >= pList->nAlloc) {
pList->nAlloc = (pList->nAlloc) << 1U;
pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc);
// reset allocated memory size_t size = taosArrayGetSize(pDataBlockList);
memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize)); for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(d);
} }
pList->pData[pList->nSize++] = pBlocks; taosArrayDestroy(pDataBlockList);
}
void* tscDestroyBlockArrayList(SDataBlockList* pList) {
if (pList == NULL) {
return NULL;
}
for (int32_t i = 0; i < pList->nSize; i++) {
tscDestroyDataBlock(pList->pData[i]);
}
tfree(pList->pData);
tfree(pList);
return NULL; return NULL;
} }
...@@ -484,7 +454,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -484,7 +454,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache
if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) {
strcpy(pTableMetaInfo->name, pDataBlock->tableId); tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false);
pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta); pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta);
...@@ -497,31 +467,32 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { ...@@ -497,31 +467,32 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs
* additional space. * additional space.
*/ */
int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100); int ret = tscAllocPayload(pCmd, pDataBlock->size + 100);
if (TSDB_CODE_SUCCESS != ret) { if (TSDB_CODE_SUCCESS != ret) {
return ret; return ret;
} }
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); assert(pDataBlock->size <= pDataBlock->nAllocSize);
memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size);
/* /*
* the payloadLen should be actual message body size * the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size * the old value of payloadLen is the allocated payload size
*/ */
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; pCmd->payloadLen = pDataBlock->size;
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0); assert(pCmd->allocSize >= pCmd->payloadLen + 100 && pCmd->payloadLen > 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tscFreeUnusedDataBlocks(SDataBlockList* pList) { //void tscFreeUnusedDataBlocks(SDataBlockList* pList) {
/* release additional memory consumption */ // /* release additional memory consumption */
for (int32_t i = 0; i < pList->nSize; ++i) { // for (int32_t i = 0; i < pList->nSize; ++i) {
STableDataBlocks* pDataBlock = pList->pData[i]; // STableDataBlocks* pDataBlock = pList->pData[i];
pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size); // pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size);
pDataBlock->nAllocSize = (uint32_t)pDataBlock->size; // pDataBlock->nAllocSize = (uint32_t)pDataBlock->size;
} // }
} //}
/** /**
* create the in-memory buffer for each table to keep the submitted data block * create the in-memory buffer for each table to keep the submitted data block
...@@ -568,7 +539,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff ...@@ -568,7 +539,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks) { STableDataBlocks** dataBlocks) {
*dataBlocks = NULL; *dataBlocks = NULL;
...@@ -585,7 +556,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, ...@@ -585,7 +556,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
} }
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
tscAppendDataBlock(pDataBlockList, *dataBlocks); taosArrayPush(pDataBlockList, dataBlocks);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -634,14 +605,15 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { ...@@ -634,14 +605,15 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) {
return len; return len;
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList(); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) { size_t total = taosArrayGetSize(pTableDataBlockList);
STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; for (int32_t i = 0; i < total; ++i) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i);
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
...@@ -679,10 +651,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -679,10 +651,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
tscSortRemoveDataBlockDupRows(pOneTableBlock); tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId,
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e)); pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2);
...@@ -704,7 +676,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi ...@@ -704,7 +676,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
// free the table data blocks; // free the table data blocks;
pCmd->pDataBlocks = pVnodeDataBlockList; pCmd->pDataBlocks = pVnodeDataBlockList;
tscFreeUnusedDataBlocks(pCmd->pDataBlocks); // tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -244,7 +244,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -244,7 +244,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100) #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size #define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
......
...@@ -105,8 +105,9 @@ typedef struct { ...@@ -105,8 +105,9 @@ typedef struct {
void tsdbClearTableCfg(STableCfg *config); void tsdbClearTableCfg(STableCfg *config);
void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes); void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes);
char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id); char* tsdbGetTableName(void *pTable);
STableId tsdbGetTableId(void *pTable);
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
...@@ -176,18 +177,16 @@ typedef struct SQueryRowCond { ...@@ -176,18 +177,16 @@ typedef struct SQueryRowCond {
TSKEY ts; TSKEY ts;
} SQueryRowCond; } SQueryRowCond;
typedef void *TsdbPosT;
/** /**
* Get the data block iterator, starting from position according to the query condition * Get the data block iterator, starting from position according to the query condition
* *
* @param tsdb tsdb handle * @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block * @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition * @param tableqinfoGroupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param qinfo query info handle from query processor * @param qinfo query info handle from query processor
* @return * @return
*/ */
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo); TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
/** /**
* Get the last row of the given query time window for all the tables in STableGroupInfo object. * Get the last row of the given query time window for all the tables in STableGroupInfo object.
...@@ -197,12 +196,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab ...@@ -197,12 +196,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
* @param tsdb tsdb handle * @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each * @param pCond query condition, including time window, result set order, and basic required columns for each
* block * block
* @param groupInfo tableId list. * @param tableqinfoGroupInfo tableId list.
* @return * @return
*/ */
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo); TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle); /**
* get the queried table object list
* @param pHandle
* @return
*/
SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo); void *qinfo);
...@@ -247,37 +251,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta ...@@ -247,37 +251,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
*/ */
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
/**
* todo remove this function later
* @param pQueryHandle
* @param pIdList
* @return
*/
SArray *tsdbRetrieveDataRow(TsdbQueryHandleT *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond);
/**
* Get iterator for super tables, of which tags values satisfy the tag filter info
*
* NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7))
* The filter string is sent from client directly.
* The build of the tags filter expression from string is done in the iterator generating function.
*
* @param pCond query condition
* @param pTagFilterStr tag filter info
* @return
*/
TsdbQueryHandleT *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr);
/**
* Get the qualified tables for (super) table query.
* Used to handle the super table projection queries, the last_row query, the group by on normal columns query,
* the interpolation query, and timestamp-comp query for join processing.
*
* @param pQueryHandle
* @return table sid list. the invoker is responsible for the release of this the sid list.
*/
SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle);
/** /**
* Get the qualified table id for a super table according to the tag query expression. * Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid * @param stableid. super table sid
...@@ -287,6 +260,12 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pT ...@@ -287,6 +260,12 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pT
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols); SColIndex *pColIndex, int32_t numOfCols);
/**
* destory the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestoryTableGroup(STableGroupInfo *pGroupList);
/** /**
* create the table group result including only one table, used to handle the normal table query * create the table group result including only one table, used to handle the normal table query
* *
......
...@@ -102,7 +102,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct ...@@ -102,7 +102,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
int64_t tag; int64_t tag;
STimeWindow win; STimeWindow win;
STSCursor cur; STSCursor cur;
STableId id; // for retrieve the page id list void* pTable; // for retrieve the page id list
SWindowResInfo windowResInfo; SWindowResInfo windowResInfo;
} STableQueryInfo; } STableQueryInfo;
...@@ -126,10 +126,10 @@ typedef struct SQueryCostInfo { ...@@ -126,10 +126,10 @@ typedef struct SQueryCostInfo {
uint64_t computTime; uint64_t computTime;
} SQueryCostInfo; } SQueryCostInfo;
typedef struct SGroupItem { //typedef struct SGroupItem {
STableId id; // void *pTable;
STableQueryInfo* info; // STableQueryInfo *info;
} SGroupItem; //} SGroupItem;
typedef struct SQuery { typedef struct SQuery {
int16_t numOfCols; int16_t numOfCols;
...@@ -187,8 +187,8 @@ typedef struct SQInfo { ...@@ -187,8 +187,8 @@ typedef struct SQInfo {
void* tsdb; void* tsdb;
int32_t vgId; int32_t vgId;
STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list> STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
STableGroupInfo groupInfo; // STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
int32_t groupIndex; int32_t groupIndex;
int32_t offset; // offset in group result set of subgroup, todo refactor int32_t offset; // offset in group result set of subgroup, todo refactor
......
此差异已折叠。
...@@ -72,7 +72,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf ...@@ -72,7 +72,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if (ret != 0) { if (ret != 0) {
// dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile,
// strerror(errno)); // strerror(errno));
return -TSDB_CODE_QRY_NO_DISKSPACE; return TSDB_CODE_QRY_NO_DISKSPACE;
} }
pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE;
...@@ -80,7 +80,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf ...@@ -80,7 +80,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf
if (pResultBuf->pBuf == MAP_FAILED) { if (pResultBuf->pBuf == MAP_FAILED) {
// dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno));
return -TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -157,18 +157,16 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { ...@@ -157,18 +157,16 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
return 0; return 0;
} }
void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes) { void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) {
// TODO: this function should be changed also // TODO: this function should be changed also
STsdbMeta *pMeta = tsdbGetMeta(repo);
STable * pTable = tsdbGetTableByUid(pMeta, id->uid);
STSchema *pSchema = tsdbGetTableTagSchema(pTable); STSchema *pSchema = tsdbGetTableTagSchema((STable*) pTable);
STColumn *pCol = tdGetColOfID(pSchema, colId); STColumn *pCol = tdGetColOfID(pSchema, colId);
if (pCol == NULL) { if (pCol == NULL) {
return NULL; // No matched tag volumn return NULL; // No matched tag volumn
} }
char *val = tdGetKVRowValOfCol(pTable->tagVal, colId); char *val = tdGetKVRowValOfCol(((STable*)pTable)->tagVal, colId);
assert(type == pCol->type && bytes == pCol->bytes); assert(type == pCol->type && bytes == pCol->bytes);
if (val != NULL && IS_VAR_DATA_TYPE(type)) { if (val != NULL && IS_VAR_DATA_TYPE(type)) {
...@@ -178,20 +176,21 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i ...@@ -178,20 +176,21 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i
return val; return val;
} }
char *tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id) { char *tsdbGetTableName(void* pTable) {
// TODO: need to change as thread-safe // TODO: need to change as thread-safe
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STable *pTable = tsdbGetTableByUid(pMeta, id->uid);
if (pTable == NULL) { if (pTable == NULL) {
return NULL; return NULL;
} else { } else {
return (char *)pTable->name; return (char*) (((STable *)pTable)->name);
} }
} }
STableId tsdbGetTableId(void *pTable) {
assert(pTable);
return ((STable*)pTable)->tableId;
}
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
if (pMsg == NULL) return NULL; if (pMsg == NULL) return NULL;
......
...@@ -184,15 +184,17 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab ...@@ -184,15 +184,17 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
assert(gsize > 0); assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) { for (int32_t j = 0; j < gsize; ++j) {
STableId* id = (STableId*) taosArrayGet(group, j); STable* pTable = (STable*) taosArrayGetP(group, j);
STableCheckInfo info = { STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey, .lastKey = pQueryHandle->window.skey,
.tableId = *id, .tableId = pTable->tableId,
.pTableObj = tsdbGetTableByUid(pMeta, id->uid), .pTableObj = pTable,
}; };
assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid); assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE));
taosArrayPush(pQueryHandle->pTableCheckInfo, &info); taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
} }
} }
...@@ -215,17 +217,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab ...@@ -215,17 +217,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
return pQueryHandle; return pQueryHandle;
} }
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) { SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
assert(pHandle != NULL); assert(pHandle != NULL);
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle; STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, sizeof(STableId)); SArray* res = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
taosArrayPush(res, &pCheckInfo->tableId); taosArrayPush(res, &pCheckInfo->pTableObj);
} }
return res; return res;
...@@ -1607,6 +1609,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { ...@@ -1607,6 +1609,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
} }
if (index == -1) { if (index == -1) {
// todo add failure test cases
return; return;
} }
...@@ -1856,42 +1859,19 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1856,42 +1859,19 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; }
TsdbQueryHandleT* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) { static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
return NULL;
}
SArray* tsdbGetTableList(TsdbQueryHandleT* pQueryHandle) { return NULL; }
static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) {
SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex); SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex);
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter); SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, &(*pTable)->tableId); taosArrayPush(list, pTable);
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/**
* convert the result pointer to table id instead of table object pointer
* todo remove it by using callback function to change the final result in-time.
* @param pRes
*/
static void convertQueryResult(SArray* pRes, SArray* pTableList) {
if (pTableList == NULL || taosArrayGetSize(pTableList) == 0) {
return;
}
size_t size = taosArrayGetSize(pTableList);
for (int32_t i = 0; i < size; ++i) { // todo speedup by using reserve space.
STable* pTable = taosArrayGetP(pTableList, i);
taosArrayPush(pRes, &pTable->tableId);
}
}
static void destroyHelper(void* param) { static void destroyHelper(void* param) {
if (param == NULL) { if (param == NULL) {
return; return;
...@@ -1960,16 +1940,13 @@ typedef struct STableGroupSupporter { ...@@ -1960,16 +1940,13 @@ typedef struct STableGroupSupporter {
int32_t numOfCols; int32_t numOfCols;
SColIndex* pCols; SColIndex* pCols;
STSchema* pTagSchema; STSchema* pTagSchema;
void* tsdbMeta; // void* tsdbMeta;
} STableGroupSupporter; } STableGroupSupporter;
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STableId* id1 = (STableId*) p1; STable* pTable1 = *(STable**) p1;
STableId* id2 = (STableId*) p2; STable* pTable2 = *(STable**) p2;
STable *pTable1 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id1->uid);
STable *pTable2 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id2->uid);
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) { for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
...@@ -2019,26 +1996,29 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { ...@@ -2019,26 +1996,29 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return 0; return 0;
} }
void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTables, STableGroupSupporter* pSupp, void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
__ext_compar_fn_t compareFn) { __ext_compar_fn_t compareFn) {
STableId* pId = taosArrayGet(pTableIdList, 0); STable* pTable = taosArrayGetP(pTableList, 0);
SArray* g = taosArrayInit(16, sizeof(STableId));
taosArrayPush(g, pId);
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTable);
tsdbRefTable(pTable);
for (int32_t i = 1; i < numOfTables; ++i) { for (int32_t i = 1; i < numOfTables; ++i) {
STableId* prev = taosArrayGet(pTableIdList, i - 1); STable** prev = taosArrayGet(pTableList, i - 1);
STableId* p = taosArrayGet(pTableIdList, i); STable** p = taosArrayGet(pTableList, i);
int32_t ret = compareFn(prev, p, pSupp); int32_t ret = compareFn(prev, p, pSupp);
assert(ret == 0 || ret == -1); assert(ret == 0 || ret == -1);
tsdbRefTable(*p);
assert((*p)->type == TSDB_CHILD_TABLE);
if (ret == 0) { if (ret == 0) {
taosArrayPush(g, p); taosArrayPush(g, p);
} else { } else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, sizeof(STableId)); g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, p); taosArrayPush(g, p);
} }
} }
...@@ -2046,8 +2026,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab ...@@ -2046,8 +2026,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab
taosArrayPush(pGroups, &g); taosArrayPush(pGroups, &g);
} }
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
TSDB_REPO_T* tsdb) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
...@@ -2058,22 +2037,24 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2058,22 +2037,24 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
} }
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, sizeof(STableId)); SArray* sa = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableId* tableId = taosArrayGet(pTableList, i); STable** pTable = taosArrayGet(pTableList, i);
taosArrayPush(sa, tableId); assert((*pTable)->type == TSDB_CHILD_TABLE);
tsdbRefTable(*pTable);
taosArrayPush(sa, pTable);
} }
taosArrayPush(pTableGroup, &sa); taosArrayPush(pTableGroup, &sa);
tsdbTrace("all %zu tables belong to one group", size); tsdbTrace("all %zu tables belong to one group", size);
} else { } else {
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter)); STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
pSupp->tsdbMeta = tsdbGetMeta(tsdb);
pSupp->numOfCols = numOfOrderCols; pSupp->numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema; pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols; pSupp->pCols = pCols;
taosqsort(pTableList->pData, size, sizeof(STableId), pSupp, tableGroupComparFn); taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
tfree(pSupp); tfree(pSupp);
} }
...@@ -2149,48 +2130,53 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) ...@@ -2149,48 +2130,53 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
.pExtInfo = pSTable->tagSchema, .pExtInfo = pSTable->tagSchema,
}; };
SArray* pTableList = taosArrayInit(8, POINTER_BYTES); tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp);
tExprTreeDestroy(&pExpr, destroyHelper); tExprTreeDestroy(&pExpr, destroyHelper);
convertQueryResult(pRes, pTableList);
taosArrayDestroy(pTableList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len, int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) { SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid); tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
return TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
tsdbUnlockRepoMeta(tsdb);
goto _error;
} }
if (pTable->type != TSDB_SUPER_TABLE) { if (pTable->type != TSDB_SUPER_TABLE) {
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid,
pTable->name->data); pTable->name->data);
terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
tsdbUnlockRepoMeta(tsdb);
goto _error;
} }
SArray* res = taosArrayInit(8, sizeof(STableId)); //NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, POINTER_BYTES);
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
int32_t ret = getAllTableIdList(pTable, res); int32_t ret = getAllTableList(pTable, res);
if (ret == TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pGroupInfo->numOfTables = taosArrayGetSize(res); tsdbUnlockRepoMeta(tsdb);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); goto _error;
tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
} else {
// todo add error
} }
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables);
taosArrayDestroy(res); taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret; return ret;
} }
...@@ -2227,31 +2213,45 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT ...@@ -2227,31 +2213,45 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
doQueryTableList(pTable, res, expr); doQueryTableList(pTable, res, expr);
pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid, tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid,
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
taosArrayDestroy(res); taosArrayDestroy(res);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
return ret; return ret;
_error:
return terrno;
} }
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) { int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) { if (pTable == NULL) {
return TSDB_CODE_TDB_INVALID_TABLE_ID; terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error;
} }
//todo assert table type, add the table ref count assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE);
tsdbRefTable(pTable);
if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error;
pGroupInfo->numOfTables = 1; pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableId)); SArray* group = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(group, &pTable->tableId); taosArrayPush(group, &pTable);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error:
return terrno;
} }
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
...@@ -2263,12 +2263,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2263,12 +2263,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
tSkipListDestroyIter(pTableCheckInfo->iter);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem); tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem);
tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem); tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem);
tSkipListDestroyIter(pTableCheckInfo->iter);
if (pTableCheckInfo->pDataCols != NULL) { if (pTableCheckInfo->pDataCols != NULL) {
tfree(pTableCheckInfo->pDataCols->buf); tfree(pTableCheckInfo->pDataCols->buf);
} }
...@@ -2293,3 +2292,26 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { ...@@ -2293,3 +2292,26 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree(pQueryHandle); tfree(pQueryHandle);
} }
void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL);
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
for(int32_t i = 0; i < numOfGroup; ++i) {
SArray* p = taosArrayGetP(pGroupList->pGroupList, i);
size_t numOfTables = taosArrayGetSize(p);
for(int32_t j = 0; j < numOfTables; ++j) {
STable* pTable = taosArrayGetP(p, j);
assert(pTable != NULL);
tsdbUnRefTable(pTable);
}
taosArrayDestroy(p);
}
taosArrayDestroy(pGroupList->pGroupList);
}
...@@ -68,6 +68,7 @@ while $loop <= $loops ...@@ -68,6 +68,7 @@ while $loop <= $loops
while $i < 10 while $i < 10
sql select count(*) from $stb where t1 = $i sql select count(*) from $stb where t1 = $i
if $data00 != $rowNum then if $data00 != $rowNum then
print expect $rowNum, actual: $data00
return -1 return -1
endi endi
$i = $i + 1 $i = $i + 1
......
...@@ -172,6 +172,7 @@ while $loop <= $loops ...@@ -172,6 +172,7 @@ while $loop <= $loops
endi endi
sql select c8 from $stb where t1 = $i sql select c8 from $stb where t1 = $i
if $rows != $rowNum then if $rows != $rowNum then
print expect $rowNum, actual: $rows
return -1 return -1
endi endi
sql select c9 from $stb where t1 = $i sql select c9 from $stb where t1 = $i
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册