提交 c8916eed 编写于 作者: C Cary Xu

bounded column searching optimization

上级 5a19bd96
...@@ -788,7 +788,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -788,7 +788,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
TDRowTLenT dataRowLen = pHelper->allNullLen; TDRowTLenT dataRowLen = pHelper->allNullLen;
TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE; TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
TDRowTLenT payloadValOffset = 0; TDRowTLenT payloadValOffset = 0;
TDRowLenT colValOffset = 0; TDRowLenT colValOffset = 0;
ASSERT(dataRowLen > 0); ASSERT(dataRowLen > 0);
payloadSetNCols(payload, spd->numOfBound); payloadSetNCols(payload, spd->numOfBound);
...@@ -907,8 +907,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -907,8 +907,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
payloadSetType(payload, SMEM_ROW_DATA); payloadSetType(payload, SMEM_ROW_DATA);
} }
ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW);
*len = (int32_t)(payloadValOffset + colValOffset); *len = (int32_t)(payloadValOffset + colValOffset);
payloadSetTLen(payload, *len); payloadSetTLen(payload, *len);
...@@ -1538,9 +1536,11 @@ static int32_t validateDataSource(SInsertStatementParam *pInsertParam, int32_t t ...@@ -1538,9 +1536,11 @@ static int32_t validateDataSource(SInsertStatementParam *pInsertParam, int32_t t
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo* pColInfo, SSchema* pSchema, static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo* pColInfo, SSchema* pSchema,
char* str, char **end) { char* str, char **end) {
int32_t nCols = pColInfo->numOfCols;
pColInfo->numOfBound = 0; pColInfo->numOfBound = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols); memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
for(int32_t i = 0; i < pColInfo->numOfCols; ++i) { for (int32_t i = 0; i < nCols; ++i) {
pColInfo->cols[i].hasVal = false; pColInfo->cols[i].hasVal = false;
} }
...@@ -1556,7 +1556,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1556,7 +1556,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
} }
bool isOrdered = true; bool isOrdered = true;
int32_t lastColIdx = -1; int32_t lastColIdx = -1; // last column found
while (1) { while (1) {
index = 0; index = 0;
sToken = tStrGetToken(str, &index, false); sToken = tStrGetToken(str, &index, false);
...@@ -1577,7 +1577,8 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1577,7 +1577,8 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
bool findColumnIndex = false; bool findColumnIndex = false;
// todo speedup by using hash list // todo speedup by using hash list
for (int32_t t = 0; t < pColInfo->numOfCols; ++t) { int32_t nScanned = 0, t = lastColIdx + 1;
while (t < nCols) {
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) { if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
if (pColInfo->cols[t].hasVal == true) { if (pColInfo->cols[t].hasVal == true) {
code = tscInvalidOperationMsg(pInsertParam->msg, "duplicated column name", sToken.z); code = tscInvalidOperationMsg(pInsertParam->msg, "duplicated column name", sToken.z);
...@@ -1586,17 +1587,38 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -1586,17 +1587,38 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->cols[t].hasVal = true; pColInfo->cols[t].hasVal = true;
pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->boundedColumns[pColInfo->numOfBound] = t;
pColInfo->numOfBound += 1; ++pColInfo->numOfBound;
findColumnIndex = true; findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false;
}
lastColIdx = t;
break;
}
++t;
++nScanned;
}
if (!findColumnIndex) {
t = 0;
int32_t nRemain = nCols - nScanned;
while (t < nRemain) {
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
if (pColInfo->cols[t].hasVal == true) {
code = tscInvalidOperationMsg(pInsertParam->msg, "duplicated column name", sToken.z);
goto _clean;
}
if (isOrdered) { pColInfo->cols[t].hasVal = true;
if (lastColIdx > t) { pColInfo->boundedColumns[pColInfo->numOfBound] = t;
++pColInfo->numOfBound;
findColumnIndex = true;
if (isOrdered && (lastColIdx > t)) {
isOrdered = false; isOrdered = false;
} else {
lastColIdx = t;
} }
lastColIdx = t;
break;
} }
break; ++t;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册