未验证 提交 07170492 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2448 from taosdata/feature/query

Feature/query
...@@ -171,11 +171,7 @@ typedef struct STableDataBlocks { ...@@ -171,11 +171,7 @@ typedef struct STableDataBlocks {
* to avoid it to be removed from cache * to avoid it to be removed from cache
*/ */
STableMeta *pTableMeta; STableMeta *pTableMeta;
char *pData;
union {
char *filename;
char *pData;
};
// for parameter ('?') binding // for parameter ('?') binding
uint32_t numOfAllocedParams; uint32_t numOfAllocedParams;
...@@ -398,7 +394,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; ...@@ -398,7 +394,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ;
int doAsyncParseSql(SSqlObj* pSql); int doAsyncParseSql(SSqlObj* pSql);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql);
void tscKillSTableQuery(SSqlObj *pSql); void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(SSqlObj* pSql); bool tscIsUpdateQuery(SSqlObj* pSql);
......
...@@ -1153,29 +1153,19 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1153,29 +1153,19 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _error; goto _error;
} }
char fname[PATH_MAX] = {0}; strncpy(pCmd->payload, sToken.z, sToken.n);
strncpy(fname, sToken.z, sToken.n); strdequote(pCmd->payload);
strdequote(fname);
// todo refactor extract method
wordexp_t full_path; wordexp_t full_path;
if (wordexp(fname, &full_path, 0) != 0) { if (wordexp(pCmd->payload, &full_path, 0) != 0) {
code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z); code = tscInvalidSQLErrMsg(pCmd->payload, "invalid filename", sToken.z);
goto _error; goto _error;
} }
strcpy(fname, full_path.we_wordv[0]);
wordfree(&full_path);
STableDataBlocks *pDataBlock = NULL; tstrncpy(pCmd->payload, full_path.we_wordv[0], pCmd->allocSize);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; wordfree(&full_path);
int32_t ret = tscCreateDataBlock(PATH_MAX, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name,
pTableMeta, &pDataBlock);
if (ret != TSDB_CODE_SUCCESS) {
goto _error;
}
taosArrayPush(pCmd->pDataBlocks, &pDataBlock);
strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) { } else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
...@@ -1377,172 +1367,141 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1377,172 +1367,141 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code; return code;
} }
if ((code = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { return tscProcessSql(pSql);
return code; }
typedef struct SImportFileSupport {
SSqlObj *pSql;
FILE *fp;
} SImportFileSupport;
static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
assert(param != NULL && tres != NULL);
SSqlObj *pSql = tres;
SSqlCmd *pCmd = &pSql->cmd;
SImportFileSupport *pSupporter = (SImportFileSupport *) param;
SSqlObj *pParentSql = pSupporter->pSql;
FILE *fp = pSupporter->fp;
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { // handle error
assert(taos_errno(pSql) == code);
taos_free_result(pSql);
tfree(pSupporter);
fclose(fp);
pParentSql->res.code = code;
return;
} }
return TSDB_CODE_SUCCESS; // accumulate the total submit records
} pParentSql->res.numOfRows += pSql->res.numOfRows;
static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
size_t readLen = 0;
char * line = NULL;
size_t n = 0;
int len = 0;
int32_t maxRows = 0;
SSqlCmd * pCmd = &pSql->cmd;
int numOfRows = 0;
int32_t code = 0;
int nrows = 0;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta); SSchema * pSchema = tscGetTableSchema(pTableMeta);
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
assert(pCmd->numOfClause == 1);
int32_t rowSize = tinfo.rowSize;
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
STableDataBlocks *pTableDataBlock = NULL; tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int32_t count = 0;
int32_t maxRows = 0;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); tscDestroyBlockArrayList(pSql->cmd.pDataBlocks);
pCmd->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, tinfo.rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; // return ret;
} }
taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock); taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock);
tscAllocateMemIfNeed(pTableDataBlock, tinfo.rowSize, &maxRows);
code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows); char *tokenBuf = calloc(1, 4096);
if (TSDB_CODE_SUCCESS != code) return -1;
int count = 0;
SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns};
SSchema * pSchema = tscGetTableSchema(pTableMeta);
tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns);
while ((readLen = getline(&line, &n, fp)) != -1) { while ((readLen = getline(&line, &n, fp)) != -1) {
// line[--readLen] = '\0'; if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) line[--readLen] = 0; line[--readLen] = 0;
if (readLen == 0) continue; // fang, <= to == }
if (readLen == 0) {
continue;
}
char *lineptr = line; char *lineptr = line;
strtolower(line, line); strtolower(line, line);
len = tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tmpTokenBuf); int32_t len =
tsParseOneRowData(&lineptr, pTableDataBlock, pSchema, &spd, pCmd->payload, tinfo.precision, &code, tokenBuf);
if (len <= 0 || pTableDataBlock->numOfParams > 0) { if (len <= 0 || pTableDataBlock->numOfParams > 0) {
pSql->res.code = code; pSql->res.code = code;
return (-code); break;
} }
pTableDataBlock->size += len; pTableDataBlock->size += len;
count++; if (++count >= maxRows) {
nrows++; break;
if (count >= maxRows) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
return -code;
}
pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
pTableDataBlock->size = sizeof(SSubmitBlk);
pTableDataBlock->rowSize = tinfo.rowSize;
numOfRows += pSql->res.numOfRows;
pSql->res.numOfRows = 0;
count = 0;
} }
} }
tfree(tokenBuf);
free(line);
if (count > 0) { if (count > 0) {
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
return -code; pParentSql->res.code = code;
} }
numOfRows += pSql->res.numOfRows; } else {
pSql->res.numOfRows = 0; taos_free_result(pSql);
} tfree(pSupporter);
fclose(fp);
if (line) tfree(line); pParentSql->fp = pParentSql->fetchFp;
return numOfRows; // all data has been sent to vnode, call user function
int32_t v = (pParentSql->res.code != TSDB_CODE_SUCCESS) ? pParentSql->res.code : pParentSql->res.numOfRows;
(*pParentSql->fp)(pParentSql->param, pParentSql, v);
}
} }
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
if (pCmd->command != TSDB_SQL_INSERT) { if (pCmd->command != TSDB_SQL_INSERT) {
return; return;
} }
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableDataBlocks *pDataBlock = NULL;
int32_t affected_rows = 0;
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL);
SArray *pDataBlockList = pCmd->pDataBlocks;
pCmd->pDataBlocks = NULL;
char path[PATH_MAX] = {0};
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; ++i) {
pDataBlock = taosArrayGetP(pDataBlockList, i );
if (pDataBlock == NULL) {
continue;
}
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) {
tscError("%p failed to malloc when insert file", pSql);
continue;
}
pCmd->count = 1;
tstrncpy(path, pDataBlock->filename, sizeof(path));
FILE *fp = fopen(path, "r");
if (fp == NULL) {
tscError("%p failed to open file %s to load data from file, reason:%s", pSql, path, strerror(errno));
continue;
}
tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name));
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
int32_t ret = tscGetTableMeta(pSql, pTableMetaInfo);
if (ret != TSDB_CODE_SUCCESS) {
tscError("%p get meter meta failed, abort", pSql);
continue;
}
char *tmpTokenBuf = calloc(1, 4096); // used for deleting Escape character: \\, \', \" SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
if (NULL == tmpTokenBuf) { SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
tscError("%p calloc failed", pSql);
continue;
}
int nrows = tscInsertDataFromFile(pSql, fp, tmpTokenBuf); pNew->cmd.pDataBlocks = taosArrayInit(4, POINTER_BYTES);
free(tmpTokenBuf); pCmd->count = 1;
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
if (nrows < 0) { FILE *fp = fopen(pCmd->payload, "r");
fclose(fp); if (fp == NULL) {
tscTrace("%p no records(%d) in file %s", pSql, nrows, path); pSql->res.code = TAOS_SYSTEM_ERROR(errno);
continue; tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
}
fclose(fp); tfree(pSupporter)
affected_rows += nrows; tscQueueAsyncRes(pSql);
tscTrace("%p Insert data %d records from file %s", pSql, nrows, path); return;
} }
pSql->res.numOfRows = affected_rows; pSupporter->pSql = pSql;
pSupporter->fp = fp;
// all data have been submit to vnode, release data blocks parseFileSendDataBlock(pSupporter, pNew, 0);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscDestroyBlockArrayList(pDataBlockList);
} }
...@@ -195,7 +195,11 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -195,7 +195,11 @@ int tscSendMsgToServer(SSqlObj *pSql) {
.code = 0 .code = 0
}; };
pSql->pRpcCtx = rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg); // NOTE: the rpc context should be acquired before sending data to server.
// Otherwise, the pSql object may have been released already during the response function, which is
// processMsgFromServer function. In the meanwhile, the assignment of the rpc context to sql object will absolutely
// cause crash.
/*pSql->pRpcCtx = */rpcSendRequest(pObj->pDnodeConn, &pSql->ipList, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1880,7 +1880,7 @@ void tscDoQuery(SSqlObj* pSql) { ...@@ -1880,7 +1880,7 @@ void tscDoQuery(SSqlObj* pSql) {
} }
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscProcessMultiVnodesInsertFromFile(pSql); tscProcessMultiVnodesImportFromFile(pSql);
} else { } else {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type; uint16_t type = pQueryInfo->type;
......
...@@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -245,7 +245,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE
#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE
#define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MAX_VNODES 256 #define TSDB_MAX_VNODES 256
......
...@@ -470,11 +470,12 @@ int main(int argc, char *argv[]) { ...@@ -470,11 +470,12 @@ int main(int argc, char *argv[]) {
char command[BUFFER_SIZE] = "\0"; char command[BUFFER_SIZE] = "\0";
sprintf(command, "drop database %s;", db_name); sprintf(command, "drop database %s;", db_name);
taos_query(taos, command); TAOS_RES* res = taos_query(taos, command);
taos_free_result(res);
sprintf(command, "create database %s;", db_name); sprintf(command, "create database %s;", db_name);
taos_query(taos, command); res = taos_query(taos, command);
taos_free_result(res);
char cols[STRING_LEN] = "\0"; char cols[STRING_LEN] = "\0";
int colIndex = 0; int colIndex = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册