提交 2f0b5416 编写于 作者: C Cary Xu

[TS-854]<fix>:generate final row from origin

上级 7745ed78
...@@ -44,12 +44,6 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat ...@@ -44,12 +44,6 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
int32_t allNullLen) { int32_t allNullLen) {
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols)); ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
if (tsForceDataRow) {
pBuilder->memRowType = SMEM_ROW_DATA;
return TSDB_CODE_SUCCESS;
}
if (nRows > 0) { if (nRows > 0) {
// already init(bind multiple rows by single column) // already init(bind multiple rows by single column)
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) { if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
...@@ -452,13 +446,13 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { ...@@ -452,13 +446,13 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, bool *isConverted, int32_t rowSize,
SInsertStatementParam *pInsertParam) { char *tmpTokenBuf, SInsertStatementParam *pInsertParam) {
int32_t tsc_index = 0; int32_t tsc_index = 0;
SStrToken sToken = {0}; SStrToken sToken = {0};
char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header char *row = pDataBlocks->pData +
((*isConverted) ? (pDataBlocks->size - rowSize) : pDataBlocks->size); // skip the SSubmitBlk header
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
STableMeta * pTableMeta = pDataBlocks->pTableMeta; STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = tscGetTableSchema(pTableMeta); SSchema * schema = tscGetTableSchema(pTableMeta);
...@@ -467,6 +461,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -467,6 +461,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int32_t kvLen = pBuilder->kvRowInitLen; int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false; bool isParseBindParam = false;
*isConverted = false;
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
...@@ -571,9 +566,15 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i ...@@ -571,9 +566,15 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
} }
} }
} }
// 4. perform the convert
if (isNeedConvertRow(row)) {
convertSMemRow(row + rowSize, row, pDataBlocks);
*isConverted = true;
}
} }
*len = getExtendedRowSize(pDataBlocks); // *len = getExtendedRowSize(pDataBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -631,13 +632,15 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -631,13 +632,15 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
pDataBlock->boundColumnInfo.allNullLen))) { pDataBlock->boundColumnInfo.allNullLen))) {
return code; return code;
} }
bool isConverted = false;
while (1) { while (1) {
tsc_index = 0; tsc_index = 0;
sToken = tStrGetToken(*str, &tsc_index, false); sToken = tStrGetToken(*str, &tsc_index, false);
if (sToken.n == 0 || sToken.type != TK_LP) break; if (sToken.n == 0 || sToken.type != TK_LP) break;
*str += tsc_index; *str += tsc_index;
if ((*numOfRows) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) { // allocate 1 more row size to facilitate the SDataRow/SKVRow convert
if ((*numOfRows + 1) >= maxRows || pDataBlock->size + extendedRowSize >= pDataBlock->nAllocSize) {
int32_t tSize; int32_t tSize;
code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize); code = tscAllocateMemIfNeed(pDataBlock, extendedRowSize, &tSize);
if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client if (code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
...@@ -645,17 +648,18 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -645,17 +648,18 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
ASSERT(tSize >= maxRows); ASSERT(tSize > maxRows); // 1 more row allocated
maxRows = tSize; maxRows = tSize;
} }
int32_t len = 0; // int32_t len = 0;
code = tsParseOneRow(str, pDataBlock, precision, &len, tmpTokenBuf, pInsertParam);
code = tsParseOneRow(str, pDataBlock, precision, &isConverted, extendedRowSize, tmpTokenBuf, pInsertParam);
if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly if (code != TSDB_CODE_SUCCESS) { // error message has been set in tsParseOneRow, return directly
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
} }
pDataBlock->size += len; pDataBlock->size += extendedRowSize;
tsc_index = 0; tsc_index = 0;
sToken = tStrGetToken(*str, &tsc_index, false); sToken = tStrGetToken(*str, &tsc_index, false);
...@@ -667,6 +671,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn ...@@ -667,6 +671,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
(*numOfRows)++; (*numOfRows)++;
} }
if (isConverted) {
void *convertedSMemRow = pDataBlock->pData + pDataBlock->size;
memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, extendedRowSize);
}
if ((*numOfRows) <= 0) { if ((*numOfRows) <= 0) {
strcpy(pInsertParam->msg, "no any data points"); strcpy(pInsertParam->msg, "no any data points");
...@@ -1728,13 +1736,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1728,13 +1736,19 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error; goto _error;
} }
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows); int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock);
tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows);
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
if (tokenBuf == NULL) { if (tokenBuf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
--maxRows; // 1 more row needed to facilitate the SDataRow/SKVRow convert
ASSERT(maxRows > 0);
bool isConverted = false;
while ((readLen = tgetline(&line, &n, fp)) != -1) { while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
line[--readLen] = 0; line[--readLen] = 0;
...@@ -1747,19 +1761,25 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1747,19 +1761,25 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
char *lineptr = line; char *lineptr = line;
strtolower(line, line); strtolower(line, line);
int32_t len = 0; // int32_t len = 0;
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam);
code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &isConverted, extendedRowSize, tokenBuf,
pInsertParam);
if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code; pSql->res.code = code;
break; break;
} }
pTableDataBlock->size += len; pTableDataBlock->size += extendedRowSize;
if (++count >= maxRows) { if (++count >= maxRows) {
break; break;
} }
} }
if (isConverted) {
void *convertedSMemRow = pTableDataBlock->pData + pTableDataBlock->size;
memcpy(convertedSMemRow - extendedRowSize, convertedSMemRow, extendedRowSize);
}
tfree(tokenBuf); tfree(tokenBuf);
tfree(line); tfree(line);
......
...@@ -1943,18 +1943,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI ...@@ -1943,18 +1943,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
} }
} else { } else {
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
char* payload = (blkKeyTuple + i)->payloadAddr; char* payload = (blkKeyTuple + i)->payloadAddr;
if (isNeedConvertRow(payload)) { TDRowTLenT rowTLen = memRowTLen(payload);
convertSMemRow(pDataBlock, payload, pTableDataBlock); memcpy(pDataBlock, payload, rowTLen);
TDRowTLenT rowTLen = memRowTLen(pDataBlock); pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); pBlock->dataLen += rowTLen;
pBlock->dataLen += rowTLen;
} else {
TDRowTLenT rowTLen = memRowTLen(payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
} }
} }
......
...@@ -46,7 +46,6 @@ extern int32_t tsDnodeId; ...@@ -46,7 +46,6 @@ extern int32_t tsDnodeId;
extern int tsRpcTimer; extern int tsRpcTimer;
extern int tsRpcMaxTime; extern int tsRpcMaxTime;
extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled extern int tsRpcForceTcp; // all commands go to tcp protocol if this is enabled
extern int32_t tsForceDataRow; // use SDataRow forcibly
extern int32_t tsMaxConnections; extern int32_t tsMaxConnections;
extern int32_t tsMaxShellConns; extern int32_t tsMaxShellConns;
extern int32_t tsShellActivityTimer; extern int32_t tsShellActivityTimer;
......
...@@ -54,7 +54,6 @@ int32_t tsDnodeId = 0; ...@@ -54,7 +54,6 @@ int32_t tsDnodeId = 0;
int32_t tsRpcTimer = 300; int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds; int32_t tsRpcMaxTime = 600; // seconds;
int32_t tsRpcForceTcp = 0; // disable this, means query, show command use udp protocol as default int32_t tsRpcForceTcp = 0; // disable this, means query, show command use udp protocol as default
int32_t tsForceDataRow = 0; // disable this at default
int32_t tsMaxShellConns = 50000; int32_t tsMaxShellConns = 50000;
int32_t tsMaxConnections = 5000; int32_t tsMaxConnections = 5000;
int32_t tsShellActivityTimer = 3; // second int32_t tsShellActivityTimer = 3; // second
...@@ -692,16 +691,6 @@ static void doInitGlobalConfig(void) { ...@@ -692,16 +691,6 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "forceDataRow";
cfg.ptr = &tsForceDataRow;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "statusInterval"; cfg.option = "statusInterval";
cfg.ptr = &tsStatusInterval; cfg.ptr = &tsStatusInterval;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 123 #define TSDB_CFG_MAX_NUM 122
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册