提交 bdd8292e 编写于 作者: L lihui

[#1059 TBASE-1436]

上级 7629406d
...@@ -36,7 +36,7 @@ enum { ...@@ -36,7 +36,7 @@ enum {
TSDB_USE_CLI_TS = 1, TSDB_USE_CLI_TS = 1,
}; };
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize); static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) { static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
int32_t numType = isValidNumber(pToken); int32_t numType = isValidNumber(pToken);
...@@ -522,14 +522,15 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe ...@@ -522,14 +522,15 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
*str += index; *str += index;
if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) { if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) {
int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize); int32_t tSize;
if (0 == tSize) { //TODO pass the correct error code to client int32_t retcode = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize, &tSize);
if (retcode != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(error, "client out of memory"); strcpy(error, "client out of memory");
*code = TSDB_CODE_CLI_OUT_OF_MEMORY; *code = retcode;
return -1; return -1;
} }
ASSERT(tSize > maxRows);
maxRows += tSize; maxRows = tSize;
} }
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf); int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
...@@ -574,7 +575,7 @@ static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema, ...@@ -574,7 +575,7 @@ static void tscSetAssignedColumnInfo(SParsedDataColInfo *spd, SSchema *pSchema,
} }
} }
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) { int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
size_t remain = pDataBlock->nAllocSize - pDataBlock->size; size_t remain = pDataBlock->nAllocSize - pDataBlock->size;
const int factor = 5; const int factor = 5;
uint32_t nAllocSizeOld = pDataBlock->nAllocSize; uint32_t nAllocSizeOld = pDataBlock->nAllocSize;
...@@ -594,11 +595,13 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) { ...@@ -594,11 +595,13 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize) {
//assert(false); //assert(false);
// do nothing // do nothing
pDataBlock->nAllocSize = nAllocSizeOld; pDataBlock->nAllocSize = nAllocSizeOld;
return 0; *numOfRows = (int32_t)(pDataBlock->nAllocSize) / rowSize;
return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
} }
return (int32_t)(pDataBlock->nAllocSize - pDataBlock->size) / rowSize; *numOfRows = (int32_t)(pDataBlock->nAllocSize) / rowSize;
return TSDB_CODE_SUCCESS;
} }
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterMeta, int32_t numOfRows) { static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterMeta, int32_t numOfRows) {
...@@ -664,8 +667,9 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -664,8 +667,9 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
return ret; return ret;
} }
int32_t maxNumOfRows = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize); int32_t maxNumOfRows;
if (0 == maxNumOfRows) { ret = tscAllocateMemIfNeed(dataBuf, pMeterMeta->rowSize, &maxNumOfRows);
if (TSDB_CODE_SUCCESS != ret) {
return TSDB_CODE_CLI_OUT_OF_MEMORY; return TSDB_CODE_CLI_OUT_OF_MEMORY;
} }
...@@ -1326,7 +1330,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1326,7 +1330,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
char * line = NULL; char * line = NULL;
size_t n = 0; size_t n = 0;
int len = 0; int len = 0;
uint32_t maxRows = 0; int32_t maxRows = 0;
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
int numOfRows = 0; int numOfRows = 0;
int32_t code = 0; int32_t code = 0;
...@@ -1345,8 +1349,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1345,8 +1349,8 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock);
maxRows = tscAllocateMemIfNeed(pTableDataBlock, rowSize); code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows);
if (maxRows < 1) return -1; if (TSDB_CODE_SUCCESS != code) return -1;
int count = 0; int count = 0;
SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns}; SParsedDataColInfo spd = {.numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns};
...@@ -1362,12 +1366,6 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1362,12 +1366,6 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
char *lineptr = line; char *lineptr = line;
strtolower(line, line); strtolower(line, line);
if (numOfRows >= maxRows || pTableDataBlock->size + pMeterMeta->rowSize >= pTableDataBlock->nAllocSize) {
uint32_t tSize = tscAllocateMemIfNeed(pTableDataBlock, pMeterMeta->rowSize);
if (0 == tSize) return (-TSDB_CODE_CLI_OUT_OF_MEMORY);
maxRows += tSize;
}
len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf); len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, pMeterMeta->precision, &code, tmpTokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) { if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code; pSql->res.code = code;
......
...@@ -563,8 +563,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -563,8 +563,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (numOfPoints >= (pVnode->cfg.blocksPerMeter - 2) * pObj->pointsPerBlock) { if (numOfPoints >= (pVnode->cfg.blocksPerMeter - 2) * pObj->pointsPerBlock) {
code = TSDB_CODE_BATCH_SIZE_TOO_BIG; code = TSDB_CODE_BATCH_SIZE_TOO_BIG;
dError("vid:%d sid:%d id:%s, batch size too big, it shall be smaller than:%d", pObj->vnode, pObj->sid, dError("vid:%d sid:%d id:%s, batch size too big, insert points:%d, it shall be smaller than:%d", pObj->vnode, pObj->sid,
pObj->meterId, (pVnode->cfg.blocksPerMeter - 2) * pObj->pointsPerBlock); pObj->meterId, numOfPoints, (pVnode->cfg.blocksPerMeter - 2) * pObj->pointsPerBlock);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册