From 40c1d665b5481850c4a0e0604675f5eedf648e03 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Tue, 2 Jun 2020 18:12:33 +0800 Subject: [PATCH] fix some issues --- src/client/src/tscStream.c | 14 ++++++++++++++ src/common/inc/tdataformat.h | 1 - src/cq/src/cqMain.c | 13 +++++++------ src/dnode/inc/dnodeModule.h | 2 +- src/inc/tcq.h | 2 +- src/vnode/src/vnodeMain.c | 2 +- 6 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 3c7cd3e1eb..83a097f2f5 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -477,6 +477,14 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { } } +static void asyncCallback(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((SSqlObj *)param); + + pSql->res.code = code; + sem_post(&pSql->rspSem); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -521,7 +529,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } + pSql->param = pSql; + pSql->fp = asyncCallback; pRes->code = tscToSQLCmd(pSql, &SQLInfo); + if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) { + sem_wait(&pSql->rspSem); + } + SQLInfoDestroy(&SQLInfo); if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index e706f9a9d5..3a84ce8f6f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -70,7 +70,6 @@ typedef struct { int numOfCols; // Number of columns appended int tlen; // maximum length of a SDataRow without the header part int flen; // First part length in a SDataRow after the header part - int32_t version; STColumn columns[]; } STSchema; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d96e0fe6a7..401d61b7a2 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -239,19 +239,19 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); + int32_t flen = 0; + for (int32_t i = 0; i < pSchema->numOfCols; i++) { + flen += TYPE_BYTES[pSchema->columns[i].type]; + } + // construct data - int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + flen; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); - int32_t flen = 0; - for (int32_t i = 0; i < pSchema->numOfCols; i++) { - flen += TYPE_BYTES[pSchema->columns[i].type]; - } - SDataRow trow = (SDataRow)pBlk->data; dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); @@ -279,5 +279,6 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { // write into vnode write queue pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); + free(buffer); } diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index fb529ee67c..6a6da0a2a5 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); void dnodeStartStream(); -void dnodeCleanUpModules(); +void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); #ifdef __cplusplus diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 32b75674c3..9d987da468 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -27,7 +27,7 @@ typedef struct { int vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_DB_NAME_LEN]; + char db[TSDB_DB_NAME_LEN + 1]; FCqWrite cqWrite; } SCqCfg; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 26ba392a76..e4f1f7d2f5 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "_root"); strcpy(cqCfg.pass, tsInternalPass); - strcpy(cqCfg.db, "db"); // TODO: replace hard coded db name + strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); -- GitLab