未验证 提交 f8cea1c5 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1507 from taosdata/feature/query

Feature/query
...@@ -12,6 +12,7 @@ rpms/ ...@@ -12,6 +12,7 @@ rpms/
mac/ mac/
*.pyc *.pyc
*.tmp *.tmp
*.swp
src/connector/nodejs/node_modules/ src/connector/nodejs/node_modules/
src/connector/nodejs/out/ src/connector/nodejs/out/
tests/test/ tests/test/
......
...@@ -357,7 +357,6 @@ typedef struct SSqlObj { ...@@ -357,7 +357,6 @@ typedef struct SSqlObj {
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
tsem_t emptyRspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint8_t numOfSubs; uint8_t numOfSubs;
...@@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql); ...@@ -409,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param); void tscQueueAsyncError(void(*fp), void *param, int32_t code);
int tscProcessLocalCmd(SSqlObj *pSql); int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg); int tscCfgDynamicOptions(char *msg);
...@@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj); ...@@ -450,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj); void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
......
...@@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) { void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -51,17 +51,15 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload"); tscError("failed to malloc payload");
tfree(pSql); tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
tscQueueAsyncError(fp, param);
return; return;
} }
pSql->sqlstr = malloc(sqlLen + 1); pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
free(pCmd->payload); free(pCmd->payload);
free(pSql);
return; return;
} }
...@@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const ...@@ -75,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = (uint8_t)code; pSql->res.code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa ...@@ -88,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj); tscError("bug!!! pObj:%p", pObj);
globalCode = TSDB_CODE_DISCONNECTED; terrno = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return; return;
} }
int32_t sqlLen = strlen(sqlstr); int32_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) { if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string too long"); tscError("sql string too long");
tscQueueAsyncError(fp, param); terrno = TSDB_CODE_INVALID_SQL;
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL);
return; return;
} }
...@@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa ...@@ -105,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) { if (pSql == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param); terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
return; return;
} }
...@@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -170,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes->code = numOfRows; pRes->code = numOfRows;
} }
tscQueueAsyncError(pSql->fetchFp, param); tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
return; return;
} }
...@@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -200,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlObj *pSql = (SSqlObj *)taosa; SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL"); tscError("sql object is NULL");
globalCode = TSDB_CODE_DISCONNECTED; // globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return; return;
} }
...@@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -210,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
return; return;
} }
...@@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -232,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
SSqlObj *pSql = (SSqlObj *)taosa; SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL"); tscError("sql object is NULL");
globalCode = TSDB_CODE_DISCONNECTED; // globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return; return;
} }
...@@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -242,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
tscQueueAsyncError(fp, param); tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
return; return;
} }
...@@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { ...@@ -331,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
// pCmd may be released, so cache pCmd->command // pCmd may be released, so cache pCmd->command
int cmd = pCmd->command; int cmd = pCmd->command;
int code = pRes->code ? -pRes->code : pRes->numOfRows; int code = pRes->code;// ? -pRes->code : pRes->numOfRows;
// in case of async insert, restore the user specified callback function // in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
...@@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { ...@@ -349,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
} }
} }
void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle; void (*fp)() = pMsg->ahandle;
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
(*fp)(pMsg->thandle, NULL, -1);
} }
void tscQueueAsyncError(void(*fp), void *param) { void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
int32_t* c = malloc(sizeof(int32_t));
*c = code;
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.fp = tscProcessAsyncError; schedMsg.fp = tscProcessAsyncError;
schedMsg.ahandle = fp; schedMsg.ahandle = fp;
schedMsg.thandle = param; schedMsg.thandle = param;
schedMsg.msg = NULL; schedMsg.msg = c;
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
...@@ -412,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -412,7 +414,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code != 0) { if (code != 0) {
pRes->code = code; pRes->code = code;
tscTrace("%p failed to renew tableMeta", pSql); tscTrace("%p failed to renew tableMeta", pSql);
tsem_post(&pSql->rspSem); // tsem_post(&pSql->rspSem);
} else { } else {
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d", tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->retry); pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
...@@ -424,7 +426,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -424,7 +426,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
code = tscSendMsgToServer(pSql); code = tscSendMsgToServer(pSql);
if (code != 0) { if (code != 0) {
pRes->code = code; pRes->code = code;
tsem_post(&pSql->rspSem); // tsem_post(&pSql->rspSem);
} }
} }
......
...@@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) { ...@@ -488,7 +488,6 @@ TAOS_STMT* taos_stmt_init(TAOS* taos) {
} }
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
......
...@@ -117,7 +117,7 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo); ...@@ -117,7 +117,7 @@ static int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo);
static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index); static int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index);
static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, static int32_t convertSyntaxTreeToExprTree(tExprNode **pExpr, tSQLExpr* pAst, int32_t* num,
SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo); SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo);
/* /*
...@@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -215,7 +215,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pQueryInfo->numOfTables == 0) { if (pQueryInfo->numOfTables == 0) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
} else { } else {
pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0]; pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
} }
pCmd->command = pInfo->type; pCmd->command = pInfo->type;
...@@ -1208,12 +1208,12 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1208,12 +1208,12 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo; SSqlBinaryExprInfo* pBinExprInfo = &pFuncExpr->binExprInfo;
tSQLSyntaxNode* pNode = NULL; tExprNode* pNode = NULL;
SColIndexEx* pColIndex = NULL; SColIndexEx* pColIndex = NULL;
int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pNode, pItem->pNode, &pBinExprInfo->numOfCols, &pColIndex, &pQueryInfo->exprsInfo); int32_t ret = convertSyntaxTreeToExprTree(&pNode, pItem->pNode, &pBinExprInfo->numOfCols, &pColIndex, &pQueryInfo->exprsInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tSQLBinaryExprDestroy(&pNode, NULL); tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause"); return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause");
} }
...@@ -5807,20 +5807,20 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { ...@@ -5807,20 +5807,20 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
return TSDB_CODE_SUCCESS; // Does not build query message here return TSDB_CODE_SUCCESS; // Does not build query message here
} }
static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* pAst, int32_t* num, static int32_t convertSyntaxTreeToExprTree(tExprNode **pExpr, tSQLExpr* pAst, int32_t* num,
SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo) { SColIndexEx** pColIndex, SSqlExprInfo* pExprInfo) {
tSQLSyntaxNode* pLeft = NULL; tExprNode* pLeft = NULL;
tSQLSyntaxNode* pRight= NULL; tExprNode* pRight= NULL;
if (pAst->pLeft != NULL) { if (pAst->pLeft != NULL) {
int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pLeft, pAst->pLeft, num, pColIndex, pExprInfo); int32_t ret = convertSyntaxTreeToExprTree(&pLeft, pAst->pLeft, num, pColIndex, pExprInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
} }
if (pAst->pRight != NULL) { if (pAst->pRight != NULL) {
int32_t ret = tSQLBinaryExprCreateFromSqlExpr(&pRight, pAst->pRight, num, pColIndex, pExprInfo); int32_t ret = convertSyntaxTreeToExprTree(&pRight, pAst->pRight, num, pColIndex, pExprInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
...@@ -5828,14 +5828,14 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* ...@@ -5828,14 +5828,14 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
if (pAst->pLeft == NULL) { if (pAst->pLeft == NULL) {
if (pAst->nSQLOptr >= TK_TINYINT && pAst->nSQLOptr <= TK_DOUBLE) { if (pAst->nSQLOptr >= TK_TINYINT && pAst->nSQLOptr <= TK_DOUBLE) {
*pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(tVariant)); *pExpr = calloc(1, sizeof(tExprNode) + sizeof(tVariant));
(*pExpr)->nodeType = TSQL_NODE_VALUE; (*pExpr)->nodeType = TSQL_NODE_VALUE;
(*pExpr)->pVal = (tVariant*) ((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); (*pExpr)->pVal = (tVariant*) ((char*)(*pExpr) + sizeof(tExprNode));
tVariantAssign((*pExpr)->pVal, &pAst->val); tVariantAssign((*pExpr)->pVal, &pAst->val);
} else if (pAst->nSQLOptr >= TK_COUNT && pAst->nSQLOptr <= TK_AVG_IRATE) { } else if (pAst->nSQLOptr >= TK_COUNT && pAst->nSQLOptr <= TK_AVG_IRATE) {
*pExpr = calloc(1, sizeof(tSQLSyntaxNode) + sizeof(SSchemaEx)); *pExpr = calloc(1, sizeof(tExprNode) + sizeof(SSchemaEx));
(*pExpr)->nodeType = TSQL_NODE_COL; (*pExpr)->nodeType = TSQL_NODE_COL;
(*pExpr)->pSchema = (SSchema*)((char*)(*pExpr) + sizeof(tSQLSyntaxNode)); (*pExpr)->pSchema = (SSchema*)((char*)(*pExpr) + sizeof(tExprNode));
strncpy((*pExpr)->pSchema->name, pAst->operand.z, pAst->operand.n); strncpy((*pExpr)->pSchema->name, pAst->operand.z, pAst->operand.n);
// set the input column data byte and type. // set the input column data byte and type.
...@@ -5855,7 +5855,7 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr* ...@@ -5855,7 +5855,7 @@ static int32_t tSQLBinaryExprCreateFromSqlExpr(tSQLSyntaxNode **pExpr, tSQLExpr*
strncpy((*pColIndex)[(*num) - 1].name, pAst->operand.z, pAst->operand.n); strncpy((*pColIndex)[(*num) - 1].name, pAst->operand.z, pAst->operand.n);
} else { } else {
*pExpr = (tSQLSyntaxNode *)calloc(1, sizeof(tSQLSyntaxNode)); *pExpr = (tExprNode *)calloc(1, sizeof(tExprNode));
(*pExpr)->_node.hasPK = false; (*pExpr)->_node.hasPK = false;
(*pExpr)->_node.pLeft = pLeft; (*pExpr)->_node.pLeft = pLeft;
(*pExpr)->_node.pRight = pRight; (*pExpr)->_node.pRight = pRight;
......
...@@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -285,14 +285,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pRes->rspType = rpcMsg->msgType; pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen; pRes->rspLen = rpcMsg->contLen;
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); if (pRes->rspLen > 0) {
if (tmp == NULL) { char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; if (tmp == NULL) {
} else { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
pRes->pRsp = tmp; } else {
if (pRes->rspLen) { pRes->pRsp = tmp;
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
} }
} else {
pRes->pRsp = NULL;
} }
// ignore the error information returned from mnode when set ignore flag in sql // ignore the error information returned from mnode when set ignore flag in sql
...@@ -327,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -327,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
/* /*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
......
...@@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { ...@@ -155,6 +155,10 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
assert(pObj != NULL && pObj->pSql != NULL); assert(pObj != NULL && pObj->pSql != NULL);
if (code < 0) {
pObj->pSql->res.code = code;
}
sem_post(&pObj->pSql->rspSem); sem_post(&pObj->pSql->rspSem);
} }
...@@ -177,17 +181,17 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -177,17 +181,17 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
sem_wait(&pSql->rspSem); sem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) { if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code;
taos_close(pObj); taos_close(pObj);
return NULL; return NULL;
} }
tscTrace("%p DB connection is opening", pObj); tscTrace("%p DB connection is opening", pObj);
// version compare only requires the first 3 segments of the version string // version compare only requires the first 3 segments of the version string
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
if (code != 0) { if (code != 0) {
pSql->res.code = code; terrno = code;
taos_close(pObj); taos_close(pObj);
return NULL; return NULL;
} else { } else {
...@@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -267,31 +271,29 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; assert(param != NULL);
assert(pObj != NULL && pObj->pSql != NULL); SSqlObj *pSql = ((STscObj *)param)->pSql;
sem_post(&pObj->pSql->rspSem); // valid error code is less than 0
if (code < 0) {
pSql->res.code = code;
}
sem_post(&pSql->rspSem);
} }
int taos_query(TAOS *taos, const char *sqlstr) { int taos_query(TAOS *taos, const char *sqlstr) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED; terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED;
} }
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); SSqlObj* pSql = pObj->pSql;
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pObj->pSql = pSql;
tsem_init(&pSql->rspSem, 0, 0);
int32_t sqlLen = strlen(sqlstr); size_t sqlLen = strlen(sqlstr);
doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
// wait for the callback function to post the semaphore // wait for the callback function to post the semaphore
sem_wait(&pSql->rspSem); sem_wait(&pSql->rspSem);
...@@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -649,12 +651,12 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
return pRes->tsrow; return pRes->tsrow;
} }
static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres; SSqlObj* pSql = (SSqlObj*) tres;
if (numOfRows < 0) { // set the error code if (numOfRows < 0) { // set the error code
pSql->res.code = -numOfRows; pSql->res.code = -numOfRows;
} }
sem_post(&pSql->rspSem); sem_post(&pSql->rspSem);
} }
...@@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -677,7 +679,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data // current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true && if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) { (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem); sem_wait(&pSql->rspSem);
} }
...@@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -754,20 +756,18 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pRes == NULL || pRes->qhandle == 0) { if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */ /* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (pSql->fp != NULL) {
STscObj* pObj = pSql->pTscObj; if (tscShouldFreeAsyncSqlObj(pSql)) {
tscFreeSqlObj(pSql);
if (pSql == pObj->pSql) {
pObj->pSql = NULL;
tscFreeSqlObj(pSql);
}
tscTrace("%p Async SqlObj is freed by app", pSql); tscTrace("%p Async SqlObj is freed by app", pSql);
} else if (keepCmd) {
tscFreeSqlResult(pSql);
} else { } else {
tscFreeSqlObjPartial(pSql); if (keepCmd) {
tscFreeSqlResult(pSql);
} else {
tscFreeSqlObjPartial(pSql);
}
} }
return; return;
} }
...@@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -793,7 +793,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
*/ */
if (pRes->code != TSDB_CODE_QUERY_CANCELLED && if (pRes->code != TSDB_CODE_QUERY_CANCELLED &&
((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL) || ((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL && pRes->completed == false) ||
(pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT && (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) { pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
...@@ -836,39 +836,37 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { ...@@ -836,39 +836,37 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
} }
} else { } else {
// if no free resource msg is sent to vnode, we free this object immediately. // if no free resource msg is sent to vnode, we free this object immediately.
bool free = tscShouldFreeAsyncSqlObj(pSql);
if (pSql->fp) { if (free) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
tscTrace("%p Async sql result is freed by app", pSql); tscTrace("%p Async sql result is freed by app", pSql);
} else if (keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else { } else {
tscFreeSqlObjPartial(pSql); if (keepCmd) {
tscTrace("%p sql result is freed", pSql); tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
} }
} }
} }
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
// todo should not be used in async query
int taos_errno(TAOS *taos) { int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
int code;
if (pObj == NULL || pObj->signature != pObj) return globalCode;
if ((int8_t)(pObj->pSql->res.code) == -1) if (pObj == NULL || pObj->signature != pObj) {
code = TSDB_CODE_OTHERS; return terrno;
else }
code = pObj->pSql->res.code;
return code; return pObj->pSql->res.code;
} }
//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
/* /*
* In case of invalid sql error, additional information is attached to explain * In case of invalid sql error, additional information is attached to explain
* why the sql is invalid * why the sql is invalid
...@@ -888,13 +886,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { ...@@ -888,13 +886,15 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
return z != NULL; return z != NULL;
} }
// todo should not be used in async model
char *taos_errstr(TAOS *taos) { char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) if (pObj == NULL || pObj->signature != pObj)
return (char*)tstrerror(globalCode); return (char*)tstrerror(terrno);
SSqlObj *pSql = pObj->pSql; SSqlObj* pSql = pObj->pSql;
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
return pSql->cmd.payload; return pSql->cmd.payload;
} else { } else {
......
...@@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -124,7 +124,6 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->sqlstr = sqlstr; pSql->sqlstr = sqlstr;
tsem_init(&pSql->rspSem, 0, 0); tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1; pRes->numOfRows = 1;
......
...@@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) { ...@@ -392,10 +392,10 @@ void freeSubqueryObj(SSqlObj* pSql) {
static void doQuitSubquery(SSqlObj* pParentSql) { static void doQuitSubquery(SSqlObj* pParentSql) {
freeSubqueryObj(pParentSql); freeSubqueryObj(pParentSql);
tsem_wait(&pParentSql->emptyRspSem); // tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem); // tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem); // tsem_post(&pParentSql->rspSem);
} }
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
...@@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -567,7 +567,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
freeSubqueryObj(pParentSql); freeSubqueryObj(pParentSql);
} }
tsem_post(&pParentSql->rspSem); // tsem_post(&pParentSql->rspSem);
} else { } else {
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal); tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
} }
...@@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -662,7 +662,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
} }
// wait for all subquery completed // wait for all subquery completed
tsem_wait(&pSql->rspSem); // tsem_wait(&pSql->rspSem);
// update the records for each subquery // update the records for each subquery
for(int32_t i = 0; i < pSql->numOfSubs; ++i) { for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
...@@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -797,10 +797,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
if (pParentSql->fp == NULL) { if (pParentSql->fp == NULL) {
tsem_wait(&pParentSql->emptyRspSem); // tsem_post(&pParentSql->rspSem);
tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem);
} else { } else {
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
...@@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { ...@@ -954,10 +951,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
} }
} }
tsem_post(&pSql->emptyRspSem); // tsem_wait(&pSql->rspSem);
tsem_wait(&pSql->rspSem);
tsem_post(&pSql->emptyRspSem);
if (pSql->numOfSubs <= 0) { if (pSql->numOfSubs <= 0) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
......
...@@ -40,15 +40,10 @@ void * tscQhandle; ...@@ -40,15 +40,10 @@ void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
int tsInsertHeadSize; int tsInsertHeadSize;
extern int tscEmbedded; int tscNumOfThreads;
int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tscMutex;
extern int tsTscEnableRecordSql; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
extern int tsNumOfLogLines;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void deltaToUtcInitOnce();
void tscCheckDiskUsage(void *para, void *unused) { void tscCheckDiskUsage(void *para, void *unused) {
taosGetDisk(); taosGetDisk();
...@@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -60,7 +55,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
char secretEncrypt[32] = {0}; char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
pthread_mutex_lock(&tscMutex);
if (pVnodeConn == NULL) { if (pVnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp; rpcInit.localIp = tsLocalIp;
...@@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -78,7 +72,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pVnodeConn = rpcOpen(&rpcInit); pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) { if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode"); tscError("failed to init connection to vnode");
pthread_mutex_unlock(&tscMutex);
return -1; return -1;
} }
} }
...@@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) { ...@@ -100,12 +93,10 @@ int32_t tscInitRpc(const char *user, const char *secret) {
pTscMgmtConn = rpcOpen(&rpcInit); pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) { if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt"); tscError("failed to init connection to mgmt");
pthread_mutex_unlock(&tscMutex);
return -1; return -1;
} }
} }
pthread_mutex_unlock(&tscMutex);
return 0; return 0;
} }
...@@ -113,7 +104,7 @@ void taos_init_imp() { ...@@ -113,7 +104,7 @@ void taos_init_imp() {
char temp[128]; char temp[128];
struct stat dirstat; struct stat dirstat;
pthread_mutex_init(&tscMutex, NULL); errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec()); srand(taosGetTimestampSec());
deltaToUtcInitOnce(); deltaToUtcInitOnce();
......
...@@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) { ...@@ -469,7 +469,7 @@ void tscFreeSqlObjPartial(SSqlObj* pSql) {
pSql->freed = 0; pSql->freed = 0;
tscFreeSqlCmdData(pCmd); tscFreeSqlCmdData(pCmd);
tscTrace("%p free sqlObj partial completed", pSql); tscTrace("%p partially free sqlObj completed", pSql);
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
...@@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -487,8 +487,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tfree(pCmd->payload); tfree(pCmd->payload);
pCmd->allocSize = 0; pCmd->allocSize = 0;
tsem_destroy(&pSql->rspSem);
free(pSql); free(pSql);
} }
...@@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) { ...@@ -820,7 +818,9 @@ void tscCloseTscObj(STscObj* pObj) {
taosTmrStopA(&(pObj->pTimer)); taosTmrStopA(&(pObj->pTimer));
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
sem_destroy(&pSql->rspSem);
pthread_mutex_destroy(&pObj->mutex); pthread_mutex_destroy(&pObj->mutex);
tscTrace("%p DB connection is closed", pObj); tscTrace("%p DB connection is closed", pObj);
tfree(pObj); tfree(pObj);
} }
...@@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { ...@@ -842,10 +842,9 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
if (pCmd->payload == NULL) { if (pCmd->payload == NULL) {
assert(pCmd->allocSize == 0); assert(pCmd->allocSize == 0);
pCmd->payload = (char*)malloc(size); pCmd->payload = (char*)calloc(1, size);
if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY; if (pCmd->payload == NULL) return TSDB_CODE_CLI_OUT_OF_MEMORY;
pCmd->allocSize = size; pCmd->allocSize = size;
memset(pCmd->payload, 0, pCmd->allocSize);
} else { } else {
if (pCmd->allocSize < size) { if (pCmd->allocSize < size) {
char* b = realloc(pCmd->payload, size); char* b = realloc(pCmd->payload, size);
...@@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) { ...@@ -853,6 +852,8 @@ int tscAllocPayload(SSqlCmd* pCmd, int size) {
pCmd->payload = b; pCmd->payload = b;
pCmd->allocSize = size; pCmd->allocSize = size;
} }
memset(pCmd->payload, 0, pCmd->payloadLen);
} }
//memset(pCmd->payload, 0, pCmd->allocSize); //memset(pCmd->payload, 0, pCmd->allocSize);
...@@ -1105,7 +1106,7 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) { ...@@ -1105,7 +1106,7 @@ void tscClearFieldInfo(SFieldInfo* pFieldInfo) {
for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) { for(int32_t i = 0; i < pFieldInfo->numOfOutputCols; ++i) {
if (pFieldInfo->pExpr[i] != NULL) { if (pFieldInfo->pExpr[i] != NULL) {
tSQLBinaryExprDestroy(&pFieldInfo->pExpr[i]->binExprInfo.pBinExpr, NULL); tExprTreeDestroy(&pFieldInfo->pExpr[i]->binExprInfo.pBinExpr, NULL);
tfree(pFieldInfo->pExpr[i]->binExprInfo.pReqColumns); tfree(pFieldInfo->pExpr[i]->binExprInfo.pReqColumns);
tfree(pFieldInfo->pExpr[i]); tfree(pFieldInfo->pExpr[i]);
} }
...@@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { ...@@ -1742,7 +1743,7 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
} }
STscObj* pTscObj = pSql->pTscObj; STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql) { if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql) {
return false; return false;
} }
...@@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST ...@@ -1929,7 +1930,6 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
} }
pTableMetaInfo->pTableMeta = pTableMeta; pTableMetaInfo->pTableMeta = pTableMeta;
// pTableMetaInfo->pMetricMeta = pMetricMeta;
pTableMetaInfo->numOfTags = numOfTags; pTableMetaInfo->numOfTags = numOfTags;
if (tags != NULL) { if (tags != NULL) {
...@@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro ...@@ -1963,7 +1963,7 @@ void doRemoveMeterMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro
} }
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) {
tscTrace("%p deref the metric/meter meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables);
int32_t index = pQueryInfo->numOfTables; int32_t index = pQueryInfo->numOfTables;
while (index >= 0) { while (index >= 0) {
......
...@@ -286,6 +286,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -286,6 +286,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg); dnodeProcessReadResult(pVnode, pMsg);
dnodeReleaseVnode(pVnode);
} }
} }
......
...@@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) { ...@@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) {
void (*callback)(int) = tharg; void (*callback)(int) = tharg;
timer_t timerId; timer_t timerId;
struct sigevent sevent; struct sigevent sevent = {0};
#ifdef _ALPINE #ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD; sevent.sigev_notify = SIGEV_THREAD;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef TDENGINE_TAST_H #ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H #define TDENGINE_TAST_H
#include <tbuffer.h>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -27,14 +28,14 @@ extern "C" { ...@@ -27,14 +28,14 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "tvariant.h" #include "tvariant.h"
struct tSQLSyntaxNode; struct tExprNode;
struct SSchema; struct SSchema;
struct tSkipList; struct tSkipList;
struct tSkipListNode; struct tSkipListNode;
enum { enum {
TSQL_NODE_EXPR = 0x1, TSQL_NODE_EXPR = 0x1,
TSQL_NODE_COL = 0x2, TSQL_NODE_COL = 0x2,
TSQL_NODE_VALUE = 0x4, TSQL_NODE_VALUE = 0x4,
}; };
...@@ -60,44 +61,41 @@ typedef struct SBinaryFilterSupp { ...@@ -60,44 +61,41 @@ typedef struct SBinaryFilterSupp {
void * pExtInfo; void * pExtInfo;
} SBinaryFilterSupp; } SBinaryFilterSupp;
typedef struct tSQLSyntaxNode { typedef struct tExprNode {
uint8_t nodeType; uint8_t nodeType;
union { union {
struct { struct {
uint8_t optr; // filter operator uint8_t optr; // filter operator
uint8_t hasPK; // 0: do not contain primary filter, 1: contain uint8_t hasPK; // 0: do not contain primary filter, 1: contain
void * info; // support filter operation on this expression only available for leaf node void * info; // support filter operation on this expression only available for leaf node
struct tSQLSyntaxNode *pLeft; // left child pointer struct tExprNode *pLeft; // left child pointer
struct tSQLSyntaxNode *pRight; // right child pointer struct tExprNode *pRight; // right child pointer
} _node; } _node;
struct SSchema *pSchema; struct SSchema *pSchema;
tVariant * pVal; tVariant * pVal;
}; };
} tSQLSyntaxNode; } tExprNode;
void tSQLBinaryExprFromString(tExprNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len);
typedef struct tQueryResultset { void tSQLBinaryExprToString(tExprNode *pExpr, char *dst, int32_t *len);
void ** pRes;
int64_t num;
} tQueryResultset;
void tSQLBinaryExprFromString(tSQLSyntaxNode **pExpr, SSchema *pSchema, int32_t numOfCols, char *src, int32_t len); void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
void tSQLBinaryExprToString(tSQLSyntaxNode *pExpr, char *dst, int32_t *len); void tSQLBinaryExprTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
void tSQLBinaryExprDestroy(tSQLSyntaxNode **pExprs, void (*fp)(void*)); void tSQLBinaryExprCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
void tSQLBinaryExprTraverse(tSQLSyntaxNode *pExpr, SSkipList *pSkipList, SArray *result, SBinaryFilterSupp *param);
void tSQLBinaryExprCalcTraverse(tSQLSyntaxNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, char *, int32_t)); char *(*cb)(void *, char *, int32_t));
void tSQLBinaryExprTrv(tSQLSyntaxNode *pExprs, int32_t *val, int16_t *ids); void tSQLBinaryExprTrv(tExprNode *pExprs, int32_t *val, int16_t *ids);
void tQueryResultClean(tQueryResultset *pRes);
uint8_t getBinaryExprOptr(SSQLToken *pToken); uint8_t getBinaryExprOptr(SSQLToken *pToken);
SBuffer exprTreeToBinary(tExprNode* pExprTree);
tExprNode* exprTreeFromBinary(const void* pBuf, size_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_QASTDEF_H #ifndef TDENGINE_QSQLPARSER_H
#define TDENGINE_QASTDEF_H #define TDENGINE_QSQLPARSER_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -329,4 +329,4 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); ...@@ -329,4 +329,4 @@ int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
} }
#endif #endif
#endif // TDENGINE_QASTDEF_H #endif // TDENGINE_QSQLPARSER_H
此差异已折叠。
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "hash.h" #include "hash.h"
#include "hashfunc.h" #include "hashfunc.h"
#include "os.h"
#include "shash.h"
#include "taosdef.h" #include "taosdef.h"
#include "tstoken.h" #include "tstoken.h"
#include "ttokendef.h" #include "ttokendef.h"
......
...@@ -2609,7 +2609,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2609,7 +2609,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle; tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
// check if query is killed or not set the status of query to pass the status check // check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
...@@ -5149,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { ...@@ -5149,7 +5148,7 @@ static void singleTableQueryImpl(SQInfo* pQInfo) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// group by normal column, sliding window query, interval query are handled by interval query processor // group by normal column, sliding window query, interval query are handled by interval query processor
if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
tableIntervalProcessor(pQInfo); tableIntervalProcessor(pQInfo);
} else { } else {
if (isFixedOutputQuery(pQuery)) { if (isFixedOutputQuery(pQuery)) {
...@@ -5461,7 +5460,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs ...@@ -5461,7 +5460,7 @@ static int32_t buildAirthmeticExprFromMsg(SSqlFunctionExpr *pExpr, SQueryTableMs
SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo; SSqlBinaryExprInfo *pBinaryExprInfo = &pExpr->binExprInfo;
SColumnInfo * pColMsg = pQueryMsg->colList; SColumnInfo * pColMsg = pQueryMsg->colList;
#if 0 #if 0
tSQLSyntaxNode* pBinExpr = NULL; tExprNode* pBinExpr = NULL;
SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols); SSchema* pSchema = toSchema(pQueryMsg, pColMsg, pQueryMsg->numOfCols);
dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz); dTrace("qmsg:%p create binary expr from string:%s", pQueryMsg, pExpr->pBase.arg[0].argValue.pz);
...@@ -5962,7 +5961,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -5962,7 +5961,7 @@ static void freeQInfo(SQInfo *pQInfo) {
if (pBinExprInfo->numOfCols > 0) { if (pBinExprInfo->numOfCols > 0) {
tfree(pBinExprInfo->pReqColumns); tfree(pBinExprInfo->pReqColumns);
tSQLBinaryExprDestroy(&pBinExprInfo->pBinExpr, NULL); tExprTreeDestroy(&pBinExprInfo->pBinExpr, NULL);
} }
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "ttime.h" #include "ttime.h"
#include "queryExecutor.h" #include "queryExecutor.h"
#include "queryUtil.h"
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size, int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRuntimeEnv, int32_t size,
int32_t threshold, int16_t type) { int32_t threshold, int16_t type) {
......
此差异已折叠。
...@@ -13,14 +13,15 @@ ...@@ -13,14 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stddef.h>
#include <stdint.h>
#include <stdbool.h>
#include <setjmp.h>
#ifndef TDENGINE_TBUFFER_H #ifndef TDENGINE_TBUFFER_H
#define TDENGINE_TBUFFER_H #define TDENGINE_TBUFFER_H
#include "setjmp.h"
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
/* /*
SBuffer can be used to read or write a buffer, but cannot be used for both SBuffer can be used to read or write a buffer, but cannot be used for both
...@@ -80,37 +81,33 @@ int main(int argc, char** argv) { ...@@ -80,37 +81,33 @@ int main(int argc, char** argv) {
*/ */
typedef struct { typedef struct {
jmp_buf jb; jmp_buf jb;
char* data; char* data;
size_t pos; size_t pos;
size_t size; size_t size;
} SBuffer; } SBuffer;
// common functions can be used in both read & write // common functions can be used in both read & write
#define tbufThrowError(buf, code) longjmp((buf)->jb, (code)) #define tbufThrowError(buf, code) longjmp((buf)->jb, (code))
size_t tbufTell(SBuffer* buf); size_t tbufTell(SBuffer* buf);
size_t tbufSeekTo(SBuffer* buf, size_t pos); size_t tbufSeekTo(SBuffer* buf, size_t pos);
size_t tbufSkip(SBuffer* buf, size_t size); size_t tbufSkip(SBuffer* buf, size_t size);
void tbufClose(SBuffer* buf, bool keepData); void tbufClose(SBuffer* buf, bool keepData);
// basic read functions // basic read functions
#define tbufBeginRead(buf, data, len) (((buf)->data = (char*)data), ((buf)->pos = 0), ((buf)->size = ((data) == NULL) ? 0 : (len)), setjmp((buf)->jb)) #define tbufBeginRead(buf, _data, len) ((buf)->data = (char*)(_data), ((buf)->pos = 0), ((buf)->size = ((_data) == NULL) ? 0 : (len)), setjmp((buf)->jb))
char* tbufRead(SBuffer* buf, size_t size); char* tbufRead(SBuffer* buf, size_t size);
void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size); void tbufReadToBuffer(SBuffer* buf, void* dst, size_t size);
const char* tbufReadString(SBuffer* buf, size_t* len); const char* tbufReadString(SBuffer* buf, size_t* len);
size_t tbufReadToString(SBuffer* buf, char* dst, size_t size); size_t tbufReadToString(SBuffer* buf, char* dst, size_t size);
// basic write functions // basic write functions
#define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb)) #define tbufBeginWrite(buf) ((buf)->data = NULL, ((buf)->pos = 0), ((buf)->size = 0), setjmp((buf)->jb))
void tbufEnsureCapacity(SBuffer* buf, size_t size); void tbufEnsureCapacity(SBuffer* buf, size_t size);
char* tbufGetData(SBuffer* buf, bool takeOver); char* tbufGetData(SBuffer* buf, bool takeOver);
void tbufWrite(SBuffer* buf, const void* data, size_t size); void tbufWrite(SBuffer* buf, const void* data, size_t size);
void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size); void tbufWriteAt(SBuffer* buf, size_t pos, const void* data, size_t size);
void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len); void tbufWriteStringLen(SBuffer* buf, const char* str, size_t len);
void tbufWriteString(SBuffer* buf, const char* str); void tbufWriteString(SBuffer* buf, const char* str);
// read & write function for primitive types // read & write function for primitive types
#ifndef TBUFFER_DEFINE_FUNCTION #ifndef TBUFFER_DEFINE_FUNCTION
...@@ -120,17 +117,21 @@ void tbufWriteString(SBuffer* buf, const char* str); ...@@ -120,17 +117,21 @@ void tbufWriteString(SBuffer* buf, const char* str);
void tbufWrite##name##At(SBuffer* buf, size_t pos, type data); void tbufWrite##name##At(SBuffer* buf, size_t pos, type data);
#endif #endif
TBUFFER_DEFINE_FUNCTION( bool, Bool ) TBUFFER_DEFINE_FUNCTION(bool, Bool)
TBUFFER_DEFINE_FUNCTION( char, Char ) TBUFFER_DEFINE_FUNCTION(char, Char)
TBUFFER_DEFINE_FUNCTION( int8_t, Int8 ) TBUFFER_DEFINE_FUNCTION(int8_t, Int8)
TBUFFER_DEFINE_FUNCTION( uint8_t, Unt8 ) TBUFFER_DEFINE_FUNCTION(uint8_t, Unt8)
TBUFFER_DEFINE_FUNCTION( int16_t, Int16 ) TBUFFER_DEFINE_FUNCTION(int16_t, Int16)
TBUFFER_DEFINE_FUNCTION( uint16_t, Uint16 ) TBUFFER_DEFINE_FUNCTION(uint16_t, Uint16)
TBUFFER_DEFINE_FUNCTION( int32_t, Int32 ) TBUFFER_DEFINE_FUNCTION(int32_t, Int32)
TBUFFER_DEFINE_FUNCTION( uint32_t, Uint32 ) TBUFFER_DEFINE_FUNCTION(uint32_t, Uint32)
TBUFFER_DEFINE_FUNCTION( int64_t, Int64 ) TBUFFER_DEFINE_FUNCTION(int64_t, Int64)
TBUFFER_DEFINE_FUNCTION( uint64_t, Uint64 ) TBUFFER_DEFINE_FUNCTION(uint64_t, Uint64)
TBUFFER_DEFINE_FUNCTION( float, Float ) TBUFFER_DEFINE_FUNCTION(float, Float)
TBUFFER_DEFINE_FUNCTION( double, Double ) TBUFFER_DEFINE_FUNCTION(double, Double)
#ifdef __cplusplus
}
#endif
#endif #endif
\ No newline at end of file
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
tbufWriteAt(buf, pos, &data, sizeof(data));\ tbufWriteAt(buf, pos, &data, sizeof(data));\
} }
#include "../inc/tbuffer.h" #include "tbuffer.h"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -119,13 +119,14 @@ void tbufEnsureCapacity(SBuffer* buf, size_t size) { ...@@ -119,13 +119,14 @@ void tbufEnsureCapacity(SBuffer* buf, size_t size) {
} }
char* tbufGetData(SBuffer* buf, bool takeOver) { char* tbufGetData(SBuffer* buf, bool takeOver) {
char* ret = buf->data; char* ret = buf->data;
if (takeOver) { if (takeOver) {
buf->pos = 0; buf->pos = 0;
buf->size = 0; buf->size = 0;
buf->data = NULL; buf->data = NULL;
} }
return ret;
return ret;
} }
void tbufEndWrite(SBuffer* buf) { void tbufEndWrite(SBuffer* buf) {
......
...@@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct ...@@ -147,6 +147,11 @@ void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direct
} }
void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) { void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
if (pIter->numOfFGroups == 0) {
assert(pIter->pFileGroup == NULL);
return;
}
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags); void *ptr = taosbsearch(&fid, pIter->base, sizeof(SFileGroup), pIter->numOfFGroups, compFGroupKey, flags);
if (ptr == NULL) { if (ptr == NULL) {
......
...@@ -424,39 +424,48 @@ static SDataBlockInfo getTrueDataBlockInfo(STsdbQueryHandle* pHandle, STableChec ...@@ -424,39 +424,48 @@ static SDataBlockInfo getTrueDataBlockInfo(STsdbQueryHandle* pHandle, STableChec
SArray *getDefaultLoadColumns(STsdbQueryHandle *pQueryHandle, bool loadTS); SArray *getDefaultLoadColumns(STsdbQueryHandle *pQueryHandle, bool loadTS);
static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCols, SArray *sa); static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCols, SArray *sa);
static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { static bool doLoadDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
SQueryFilePos *cur = &pQueryHandle->cur; SQueryFilePos *cur = &pQueryHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
SArray *sa = getDefaultLoadColumns(pQueryHandle, true); SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
if (pQueryHandle->window.ekey < pBlock->keyLast) {
SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols);
data->numOfCols = pBlock->numOfCols; data->numOfCols = pBlock->numOfCols;
data->uid = pCheckInfo->pTableObj->tableId.uid; data->uid = pCheckInfo->pTableObj->tableId.uid;
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema);
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA]; SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
if (pFile->fd == FD_INITIALIZER) { if (pFile->fd == FD_INITIALIZER) {
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
} }
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data);
//do something }
}
static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) {
SQueryFilePos *cur = &pQueryHandle->cur;
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
// query ended in current block
if (pQueryHandle->window.ekey < pBlock->keyLast) {
doLoadDataFromFileBlock(pQueryHandle);
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
} }
} else { } else {// todo desc query
if (pQueryHandle->window.ekey > pBlock->keyFirst) { if (pQueryHandle->window.ekey > pBlock->keyFirst) {
// loadDataBlockIntoMem_(pQueryHandle, pBlock, &pQueryHandle->pFields[cur->slot], cur->fileId, sa); //
} }
} }
filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa);
return pQueryHandle->realNumOfRows > 0; return pQueryHandle->realNumOfRows > 0;
} }
...@@ -508,7 +517,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { ...@@ -508,7 +517,7 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) {
// next block in the same file // next block in the same file
cur->slot += step; cur->slot += step;
SCompBlock* pBlock = &pQueryHandle->pBlock[cur->slot]; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1; cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
return loadQualifiedDataFromFileBlock(pQueryHandle); return loadQualifiedDataFromFileBlock(pQueryHandle);
} }
...@@ -736,12 +745,23 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -736,12 +745,23 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
int32_t index = -1; int32_t index = -1;
int32_t tid = pCheckInfo->tableId.tid; int32_t tid = pCheckInfo->tableId.tid;
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
while (1) { while (pCheckInfo->pFileGroup != NULL) {
if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) {
break; break;
} }
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
// no data block in current file, try next
if (pCheckInfo->compIndex[tid].numOfSuperBlocks == 0) {
dTrace("QInfo:%p no data block in file, fid:%d, tid:%d, try next", pQueryHandle->qinfo,
pCheckInfo->pFileGroup->fileId, tid);
pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter);
continue;
}
index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key); index = binarySearchForBlockImpl(pCheckInfo->pCompInfo->blocks, pCheckInfo->compIndex[tid].numOfSuperBlocks, pQueryHandle->order, key);
...@@ -769,12 +789,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -769,12 +789,11 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
// load first data block into memory failed, caused by disk block error // load first data block into memory failed, caused by disk block error
bool blockLoaded = false; bool blockLoaded = false;
SArray *sa = NULL; SArray *sa = getDefaultLoadColumns(pQueryHandle, true);
// todo no need to loaded at all // todo no need to loaded at all
cur->slot = index; cur->slot = index;
sa = getDefaultLoadColumns(pQueryHandle, true);
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols); SCompData* data = calloc(1, sizeof(SCompData)+ sizeof(SCompCol)*pBlock->numOfCols);
...@@ -784,6 +803,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf ...@@ -784,6 +803,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf
pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096); pCheckInfo->pDataCols = tdNewDataCols(1000, 2, 4096);
tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema); tdInitDataCols(pCheckInfo->pDataCols, pCheckInfo->pTableObj->schema);
SFile* pFile = &pCheckInfo->pFileGroup->files[TSDB_FILE_TYPE_DATA];
if (pFile->fd == FD_INITIALIZER) { if (pFile->fd == FD_INITIALIZER) {
pFile->fd = open(pFile->fname, O_RDONLY); pFile->fd = open(pFile->fname, O_RDONLY);
} }
...@@ -973,11 +993,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList ...@@ -973,11 +993,16 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
SDataBlockInfo binfo = getTrueDataBlockInfo(pHandle, pCheckInfo); SDataBlockInfo binfo = getTrueDataBlockInfo(pHandle, pCheckInfo);
if (pHandle->realNumOfRows <= binfo.size) { assert(pHandle->realNumOfRows <= binfo.size);
if (pHandle->realNumOfRows < binfo.size) {
return pHandle->pColumns; return pHandle->pColumns;
} else { } else {
// todo do load data block SArray *sa = getDefaultLoadColumns(pHandle, true);
assert(0);
doLoadDataFromFileBlock(pHandle);
filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa);
return pHandle->pColumns;
} }
} }
} }
...@@ -1197,7 +1222,7 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS ...@@ -1197,7 +1222,7 @@ static void getTagColumnInfo(SSyntaxTreeFilterSupporter* pSupporter, SSchema* pS
} }
void filterPrepare(void* expr, void* param) { void filterPrepare(void* expr, void* param) {
tSQLSyntaxNode *pExpr = (tSQLSyntaxNode*) expr; tExprNode *pExpr = (tExprNode*) expr;
if (pExpr->_node.info != NULL) { if (pExpr->_node.info != NULL) {
return; return;
} }
...@@ -1276,7 +1301,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { ...@@ -1276,7 +1301,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond) { static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond) {
STColumn* stcol = schemaColAt(pSTable->tagSchema, 0); STColumn* stcol = schemaColAt(pSTable->tagSchema, 0);
tSQLSyntaxNode* pExpr = NULL; tExprNode* pExpr = NULL;
tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond)); tSQLBinaryExprFromString(&pExpr, stcol, schemaNCols(pSTable->tagSchema), pCond, strlen(pCond));
// failed to build expression, no result, return immediately // failed to build expression, no result, return immediately
...@@ -1297,7 +1322,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond ...@@ -1297,7 +1322,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond
}; };
tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp); tSQLBinaryExprTraverse(pExpr, pSTable->pIndex, pRes, &supp);
tSQLBinaryExprDestroy(&pExpr, tSQLListTraverseDestroyInfo); tExprTreeDestroy(&pExpr, tSQLListTraverseDestroyInfo);
tansformQueryResult(pRes); tansformQueryResult(pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册