未验证 提交 14ba2ffb 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3719 from taosdata/feature/http

Feature/http
......@@ -406,7 +406,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start
return TSDB_CODE_SUCCESS;
}
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, char *error,
int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[], SParsedDataColInfo *spd, SSqlCmd* pCmd,
int16_t timePrec, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
SStrToken sToken = {0};
......@@ -426,12 +426,17 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
*str += index;
if (sToken.type == TK_QUESTION) {
if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
*code = tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
return -1;
}
uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
continue;
}
strcpy(error, "client out of memory");
strcpy(pCmd->payload, "client out of memory");
*code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
......@@ -439,8 +444,7 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || (sToken.n == 0) || (type == TK_RP)) {
tscSQLSyntaxErrMsg(error, "invalid data or symbol", sToken.z);
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
*code = tscSQLSyntaxErrMsg(pCmd->payload, "invalid data or symbol", sToken.z);
return -1;
}
......@@ -470,14 +474,14 @@ int tsParseOneRowData(char **str, STableDataBlocks *pDataBlocks, SSchema schema[
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, error, str, isPrimaryKey, timePrec);
int32_t ret = tsParseOneColumnData(pSchema, &sToken, start, pCmd->payload, str, isPrimaryKey, timePrec);
if (ret != TSDB_CODE_SUCCESS) {
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1; // NOTE: here 0 mean error!
}
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) {
tscInvalidSQLErrMsg(error, "client time/server time can not be mixed up", sToken.z);
tscInvalidSQLErrMsg(pCmd->payload, "client time/server time can not be mixed up", sToken.z);
*code = TSDB_CODE_TSC_INVALID_TIME_STAMP;
return -1;
}
......@@ -522,7 +526,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
}
int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMeta, int maxRows,
SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
SParsedDataColInfo *spd, SSqlCmd* pCmd, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0;
SStrToken sToken;
......@@ -534,8 +538,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
int32_t precision = tinfo.precision;
if (spd->hasVal[0] == false) {
strcpy(error, "primary timestamp column can not be null");
*code = TSDB_CODE_TSC_INVALID_SQL;
*code = tscInvalidSQLErrMsg(pCmd->payload, "primary timestamp column can not be null", *str);
return -1;
}
......@@ -547,17 +550,17 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
*str += index;
if (numOfRows >= maxRows || pDataBlock->size + tinfo.rowSize >= pDataBlock->nAllocSize) {
int32_t tSize;
int32_t retcode = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
if (retcode != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(error, "client out of memory");
*code = retcode;
*code = tscAllocateMemIfNeed(pDataBlock, tinfo.rowSize, &tSize);
if (*code != TSDB_CODE_SUCCESS) { //TODO pass the correct error code to client
strcpy(pCmd->payload, "client out of memory");
return -1;
}
ASSERT(tSize > maxRows);
maxRows = tSize;
}
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, error, precision, code, tmpTokenBuf);
int32_t len = tsParseOneRowData(str, pDataBlock, pSchema, spd, pCmd, precision, code, tmpTokenBuf);
if (len <= 0) { // error message has been set in tsParseOneRowData
return -1;
}
......@@ -568,7 +571,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
sToken = tStrGetToken(*str, &index, false, 0, NULL);
*str += index;
if (sToken.n == 0 || sToken.type != TK_RP) {
tscSQLSyntaxErrMsg(error, ") expected", *str);
tscSQLSyntaxErrMsg(pCmd->payload, ") expected", *str);
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1;
}
......@@ -577,7 +580,7 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pTableMe
}
if (numOfRows <= 0) {
strcpy(error, "no any data points");
strcpy(pCmd->payload, "no any data points");
*code = TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
return -1;
} else {
......@@ -704,7 +707,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd->payload, &code, tmpTokenBuf);
int32_t numOfRows = tsParseValues(str, dataBuf, pTableMeta, maxNumOfRows, spd, pCmd, &code, tmpTokenBuf);
free(tmpTokenBuf);
if (numOfRows <= 0) {
return code;
......@@ -724,10 +727,6 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableList, char **st
dataBuf->vgId = pTableMeta->vgroupInfo.vgId;
dataBuf->numOfTables = 1;
/*
* the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
* which is actually returned from server.
*/
*totalNum += numOfRows;
return TSDB_CODE_SUCCESS;
}
......@@ -1458,8 +1457,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
char *lineptr = line;
strtolower(line, line);
int32_t len =
tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
int32_t len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd, tinfo.precision, &code, tokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code;
break;
......
......@@ -43,10 +43,6 @@ typedef struct SNormalStmt {
tVariant* params;
} SNormalStmt;
//typedef struct SInsertStmt {
//
//} SInsertStmt;
typedef struct STscStmt {
bool isInsert;
STscObj* taos;
......@@ -54,7 +50,6 @@ typedef struct STscStmt {
SNormalStmt normal;
} STscStmt;
static int normalStmtAddPart(SNormalStmt* stmt, bool isParam, char* str, uint32_t len) {
uint16_t size = stmt->numParts + 1;
if (size > stmt->sizeParts) {
......@@ -514,7 +509,6 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
SSqlObj* pSql = pStmt->pSql;
size_t sqlLen = strlen(sql);
//doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pSql->param = (void*) pSql;
......
......@@ -246,6 +246,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")
......
......@@ -185,7 +185,11 @@ static int32_t sdbInitWal() {
}
sdbInfo("open sdb wal for restore");
walRestore(tsSdbObj.wal, NULL, sdbWrite);
int code = walRestore(tsSdbObj.wal, NULL, sdbWrite);
if (code != TSDB_CODE_SUCCESS) {
sdbError("failed to open wal for restore, reason:%s", tstrerror(code));
return -1;
}
return 0;
}
......
......@@ -697,22 +697,41 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
if (pCheckInfo->pDataCols == NULL) {
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return terrno;
goto _error;
}
}
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
tdInitDataCols(pCheckInfo->pDataCols, pSchema);
tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
int32_t code = tdInitDataCols(pCheckInfo->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
code = tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %p", pQueryHandle, pQueryHandle->qinfo);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _error;
}
int16_t* colIds = pQueryHandle->defaultLoadColumn->pData;
int32_t ret = tsdbLoadBlockDataCols(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, (int)(QH_GET_NUM_OF_COLS(pQueryHandle)));
if (ret != TSDB_CODE_SUCCESS) {
return terrno;
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
goto _error;
}
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
......@@ -729,10 +748,16 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
int64_t elapsedTime = (taosGetTimestampUs() - st);
pQueryHandle->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64" , rows:%d, elapsed time:%"PRId64 " us, %p",
tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pQueryHandle->qinfo);
return TSDB_CODE_SUCCESS;
_error:
pBlock->numOfRows = 0;
tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %p",
pQueryHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pQueryHandle->qinfo);
return terrno;
}
static int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo);
......
......@@ -385,9 +385,10 @@ static void walRelease(SWal *pWal) {
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name;
int size = 1024 * 1024; // default 1M buffer size
terrno = 0;
char *buffer = malloc(1024000); // size for one record
char *buffer = malloc(size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
......@@ -395,7 +396,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
SWalHead *pHead = (SWalHead *)buffer;
int fd = open(name, O_RDONLY);
int fd = open(name, O_RDWR);
if (fd < 0) {
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
......@@ -405,29 +406,58 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
wDebug("wal:%s, start to restore", name);
size_t offset = 0;
while (1) {
int ret = taosTRead(fd, pHead, sizeof(SWalHead));
if ( ret == 0) break;
if (ret == 0) break;
if (ret != sizeof(SWalHead)) {
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
if (ret < 0) {
wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < sizeof(SWalHead)) {
wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
terrno = TAOS_SYSTEM_ERROR(errno);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(false);
break;
}
}
if (pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len;
buffer = realloc(buffer, size);
if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
pHead = (SWalHead *)buffer;
}
ret = taosTRead(fd, pHead->cont, pHead->len);
if ( ret != pHead->len) {
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
if (ret < 0) {
wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
taosFtruncate(fd, offset);
fsync(fd);
break;
}
offset = offset + sizeof(SWalHead) + pHead->len;
if (pWal->keep) pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册