提交 40c1d665 编写于 作者: B Bomin Zhang

fix some issues

上级 69142ea5
......@@ -477,6 +477,14 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
}
}
static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
STscObj *pObj = (STscObj *)taos;
......@@ -521,7 +529,13 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
pSql->param = pSql;
pSql->fp = asyncCallback;
pRes->code = tscToSQLCmd(pSql, &SQLInfo);
if (pRes->code == TSDB_CODE_ACTION_IN_PROGRESS) {
sem_wait(&pSql->rspSem);
}
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) {
......
......@@ -70,7 +70,6 @@ typedef struct {
int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part
int flen; // First part length in a SDataRow after the header part
int32_t version;
STColumn columns[];
} STSchema;
......
......@@ -239,19 +239,19 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
cTrace("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t flen = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
flen += TYPE_BYTES[pSchema->columns[i].type];
}
// construct data
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pObj->rowSize;
int size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + flen;
char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer;
SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
int32_t flen = 0;
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
flen += TYPE_BYTES[pSchema->columns[i].type];
}
SDataRow trow = (SDataRow)pBlk->data;
dataRowSetLen(trow, TD_DATA_ROW_HEAD_SIZE + flen);
......@@ -279,5 +279,6 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
// write into vnode write queue
pContext->cqWrite(pContext->ahandle, pHead, TAOS_QTYPE_CQ);
free(buffer);
}
......@@ -23,7 +23,7 @@ extern "C" {
int32_t dnodeInitModules();
void dnodeStartModules();
void dnodeStartStream();
void dnodeCleanUpModules();
void dnodeCleanupModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus);
#ifdef __cplusplus
......
......@@ -27,7 +27,7 @@ typedef struct {
int vgId;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN];
char db[TSDB_DB_NAME_LEN + 1];
FCqWrite cqWrite;
} SCqCfg;
......
......@@ -210,7 +210,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "_root");
strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, "db"); // TODO: replace hard coded db name
strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册