diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 3c7cd3e1ebeaf83bdf10182247ec6b62d1dee693..83a097f2f57793d062427df6d19f92ca84eadf46 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -477,6 +477,14 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { } } +static void asyncCallback(void *param, TAOS_RES *tres, int code) { + assert(param != NULL); + SSqlObj *pSql = ((SSqlObj *)param); + + pSql->res.code = code; + sem_post(&pSql->rspSem); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -521,7 +529,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return NULL; } + pSql->param = pSql; + pSql->fp = asyncCallback; pRes->code = tscToSQLCmd(pSql, &SQLInfo); + if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) { + sem_wait(&pSql->rspSem); + } + SQLInfoDestroy(&SQLInfo); if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index e706f9a9d51357f4a4bfaa4731300a1a14baa8c8..3a84ce8f6f3ad369e9f6972d79d3356451c4ef25 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -70,7 +70,6 @@ typedef struct { int numOfCols; // Number of columns appended int tlen; // maximum length of a SDataRow without the header part int flen; // First part length in a SDataRow after the header part - int32_t version; STColumn columns[]; } STSchema; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d96e0fe6a771b8a6c773ec0ef4309236ea72e0ed..401d61b7a25b737de6ab5db3256f7a50b993f766 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -239,19 +239,19 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); + int32_t flen = 0; + for (int32_t i = 0; i < pSchema->numOfCols; i++) { + flen += TYPE_BYTES[pSchema->columns[i].type]; + } + // construct data - int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize; + int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + flen; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); - int32_t flen = 0; - for (int32_t i = 0; i < pSchema->numOfCols; i++) { - flen += TYPE_BYTES[pSchema->columns[i].type]; - } - SDataRow trow = (SDataRow)pBlk->data; dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen); @@ -279,5 +279,6 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { // write into vnode write queue pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ); + free(buffer); } diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index fb529ee67c31c40d1ad2dded48319a602645f65a..6a6da0a2a52844e4a2c42bad64658c93b48fffae 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); void dnodeStartStream(); -void dnodeCleanUpModules(); +void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); #ifdef __cplusplus diff --git a/src/inc/tcq.h b/src/inc/tcq.h index 32b75674c3278b3273fd4b98dd645f4168543155..9d987da468eaa83a0324d106fa19563c95608e6d 100644 --- a/src/inc/tcq.h +++ b/src/inc/tcq.h @@ -27,7 +27,7 @@ typedef struct { int vgId; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; - char db[TSDB_DB_NAME_LEN]; + char db[TSDB_DB_NAME_LEN + 1]; FCqWrite cqWrite; } SCqCfg; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 26ba392a7675c7848fb4f06f99e9aa2742b6c44b..e4f1f7d2f5d5b116f83589046ed129ded0f0f96a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "_root"); strcpy(cqCfg.pass, tsInternalPass); - strcpy(cqCfg.db, "db"); // TODO: replace hard coded db name + strcpy(cqCfg.db, pVnode->db); cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg);