提交 96bc72c5 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0_query_integrate' into feature/scheduler

...@@ -213,7 +213,6 @@ endif(${BUILD_WITH_TRAFT}) ...@@ -213,7 +213,6 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV # LIBUV
if(${BUILD_WITH_UV}) if(${BUILD_WITH_UV})
add_compile_options(-Wno-sign-compare)
if (${TD_WINDOWS}) if (${TD_WINDOWS})
file(READ "libuv/include/uv.h" CONTENTS) file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}") string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
......
...@@ -60,16 +60,10 @@ typedef struct SDataBlockInfo { ...@@ -60,16 +60,10 @@ typedef struct SDataBlockInfo {
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
union {int64_t uid; int64_t blockId;}; union {int64_t uid; int64_t blockId;};
int64_t groupId; // no need to serialize
} SDataBlockInfo; } SDataBlockInfo;
//typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock)
// SColumnInfo info;
// int32_t startRow; // run-length-encoding to save the space for multiple rows
// int32_t endRow;
// SVariant value;
//} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray *pDataBlock; // SArray<SColumnInfoData>
......
...@@ -1146,10 +1146,10 @@ typedef struct { ...@@ -1146,10 +1146,10 @@ typedef struct {
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; char name[TSDB_TOPIC_FNAME_LEN];
char outputTbName[TSDB_TABLE_NAME_LEN];
int8_t igExists; int8_t igExists;
char* sql; char* sql;
char* physicalPlan; char* ast;
char* logicalPlan;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
...@@ -2296,13 +2296,23 @@ enum { ...@@ -2296,13 +2296,23 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
typedef struct {
void* inputHandle;
void** executor;
} SStreamTaskParRunner;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t level; int32_t level;
int8_t status; int8_t status;
int8_t pipeEnd;
int8_t parallel;
SEpSet NextOpEp;
char* qmsg; char* qmsg;
void* executor; // not applied to encoder and decoder
SStreamTaskParRunner runner;
// void* executor;
// void* stateStore; // void* stateStore;
// storage handle // storage handle
} SStreamTask; } SStreamTask;
......
...@@ -34,6 +34,9 @@ typedef struct SReadHandle { ...@@ -34,6 +34,9 @@ typedef struct SReadHandle {
void* config; void* config;
} SReadHandle; } SReadHandle;
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDAT_BLOCK 0x2
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
...@@ -46,9 +49,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); ...@@ -46,9 +49,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
* Set the input data block for the stream scan. * Set the input data block for the stream scan.
* @param tinfo * @param tinfo
* @param input * @param input
* @param type
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
......
...@@ -52,7 +52,8 @@ typedef struct SQuery { ...@@ -52,7 +52,8 @@ typedef struct SQuery {
SSchema* pResSchema; SSchema* pResSchema;
SCmdMsgInfo* pCmdMsg; SCmdMsgInfo* pCmdMsg;
int32_t msgType; int32_t msgType;
bool streamQuery; SArray* pDbList;
SArray* pTableList;
} SQuery; } SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
...@@ -157,7 +157,8 @@ void syncCleanUp(); ...@@ -157,7 +157,8 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo); int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......
...@@ -28,7 +28,6 @@ extern "C" { ...@@ -28,7 +28,6 @@ extern "C" {
#define tcgetattr TCGETATTR_FUNC_TAOS_FORBID #define tcgetattr TCGETATTR_FUNC_TAOS_FORBID
#endif #endif
int32_t taosSystem(const char *cmd, char *buf, int32_t bufSize);
void* taosLoadDll(const char* filename); void* taosLoadDll(const char* filename);
void* taosLoadSym(void* handle, char* name); void* taosLoadSym(void* handle, char* name);
void taosCloseDll(void* handle); void taosCloseDll(void* handle);
......
...@@ -86,7 +86,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj); ...@@ -86,7 +86,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
* @param size * @param size
* @return * @return
*/ */
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size); int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size);
/** /**
* return the payload data with the specified key * return the payload data with the specified key
......
...@@ -159,8 +159,12 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { ...@@ -159,8 +159,12 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
} }
code = qParseQuerySql(&cxt, pQuery); code = qParseQuerySql(&cxt, pQuery);
if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) { if (TSDB_CODE_SUCCESS == code) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); if ((*pQuery)->haveResultSet) {
setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
}
TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
} }
return code; return code;
......
...@@ -238,7 +238,11 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -238,7 +238,11 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list // todo rsp with the vnode id list
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
free(pMsg->pData); free(pMsg->pData);
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
tsem_post(&pRequest->body.rspSem); tsem_post(&pRequest->body.rspSem);
return code;
} }
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
......
...@@ -456,6 +456,95 @@ _return: ...@@ -456,6 +456,95 @@ _return:
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* astStr = NULL;
int32_t sqlLen;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || streamName == NULL || sql == NULL) {
tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
sqlLen = strlen(sql);
if (strlen(streamName) >= TSDB_TABLE_NAME_LEN) {
tscError("stream name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create stream: %s", streamName);
int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, streamName);
SCMCreateStreamReq req = {
.igExists = 1,
.ast = astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
strcpy(req.outputTbName, tbName);
int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
void* buf = malloc(tlen);
if (buf == NULL) {
goto _return;
}
tSerializeSCMCreateStreamReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_STREAM;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
tfree(astStr);
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
...@@ -481,7 +570,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -481,7 +570,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
goto _return; goto _return;
} }
tscDebug("start to create topic, %s", topicName); tscDebug("start to create topic: %s", topicName);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
...@@ -499,7 +588,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -499,7 +588,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
SCMCreateTopicReq req = { SCMCreateTopicReq req = {
.igExists = 1, .igExists = 1,
.ast = (char*)astStr, .ast = astStr,
.sql = (char*)sql, .sql = (char*)sql,
}; };
tNameExtractFullName(&name, req.name); tNameExtractFullName(&name, req.name);
...@@ -529,6 +618,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -529,6 +618,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
tfree(astStr);
qDestroyQuery(pQueryNode); qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/ /*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/ /*destroySendMsgInfo(sendInfo);*/
......
...@@ -271,6 +271,8 @@ TEST(testCase, create_stable_Test) { ...@@ -271,6 +271,8 @@ TEST(testCase, create_stable_Test) {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create stable, reason:%s\n", taos_errstr(pRes)); printf("error in create stable, reason:%s\n", taos_errstr(pRes));
......
...@@ -2474,7 +2474,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq ...@@ -2474,7 +2474,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1; if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
TCODER_MALLOC(pReq->offsets, SMqOffset*, pReq->num * sizeof(SMqOffset), decoder); TCODER_MALLOC(pReq->offsets, SMqOffset *, pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1; if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) { for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]); tDecodeSMqOffset(decoder, &pReq->offsets[i]);
...@@ -2730,15 +2730,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) { ...@@ -2730,15 +2730,22 @@ void *tDeserializeSVDropTSmaReq(void *buf, SVDropTSmaReq *pReq) {
} }
int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) { int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateStreamReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
if (pReq->ast != NULL) astLen = (int32_t)strlen(pReq->ast);
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -2747,15 +2754,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -2747,15 +2754,30 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
} }
int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) { int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStreamReq *pReq) {
int32_t sqlLen = 0;
int32_t astLen = 0;
SCoder decoder = {0}; SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->outputTbName) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->sql) < 0) return -1; if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->physicalPlan) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pReq->logicalPlan) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = calloc(1, sqlLen + 1);
if (pReq->sql == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
if (astLen > 0) {
pReq->ast = calloc(1, astLen + 1);
if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tCoderClear(&decoder); tCoderClear(&decoder);
...@@ -2764,8 +2786,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -2764,8 +2786,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
tfree(pReq->sql); tfree(pReq->sql);
tfree(pReq->physicalPlan); tfree(pReq->ast);
tfree(pReq->logicalPlan);
} }
int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
...@@ -2774,6 +2795,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { ...@@ -2774,6 +2795,7 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
...@@ -2785,6 +2807,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { ...@@ -2785,6 +2807,7 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
......
...@@ -130,6 +130,7 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -130,6 +130,7 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg); dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg);
......
...@@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); ...@@ -28,7 +28,7 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
...@@ -31,6 +31,53 @@ ...@@ -31,6 +31,53 @@
#include "tname.h" #include "tname.h"
#include "tuuid.h" #include "tuuid.h"
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet) {
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = malloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet);
return 0;
}
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
const SSnodeObj* pSnode) {
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL; SVgObj* pVgroup = NULL;
...@@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -44,7 +91,6 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
int32_t msgLen;
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
...@@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -67,43 +113,16 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// send to vnode // send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
// TODO: set to
plan->execNode.nodeId = pVgroup->vgId; pTask->parallel = 4;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
} }
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
// duplicatable // duplicatable
...@@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -113,88 +132,26 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// if has snode, set to shared thread num in snode // if has snode, set to shared thread num in snode
parallel = SND_SHARED_THREAD_NUM; parallel = SND_SHARED_THREAD_NUM;
for (int32_t i = 0; i < parallel; i++) {
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
// TODO:get snode id and ep
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
}
} else {
// not duplicatable
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->parallel = parallel;
// TODO:get snode id and ep // TODO:get snode id and ep
plan->execNode.nodeId = pVgroup->vgId; if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
} else {
// not duplicatable
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
SCoder encoder; // TODO: get snode
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
tEncodeSStreamTask(&encoder, pTask); sdbRelease(pSdb, pVgroup);
int32_t tlen = sizeof(SMsgHead) + encoder.pos; qDestroyQueryPlan(pPlan);
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask);
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
} }
......
...@@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -220,8 +220,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.physicalPlan = pCreate->physicalPlan; streamObj.physicalPlan = "";
streamObj.logicalPlan = pCreate->logicalPlan; streamObj.logicalPlan = "";
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
......
...@@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER: ...@@ -433,12 +433,12 @@ ALLOC_VGROUP_OVER:
return code; return code;
} }
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
SEpSet epset = {0}; SEpSet epset = {0};
for (int32_t v = 0; v < pVgroup->replica; ++v) { for (int32_t v = 0; v < pVgroup->replica; ++v) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; const SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode == NULL) continue; if (pDnode == NULL) continue;
if (pVgid->role == TAOS_SYNC_STATE_LEADER) { if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
......
...@@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) { ...@@ -57,7 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
} }
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
pTask->executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL);
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
} }
......
...@@ -282,7 +282,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -282,7 +282,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->head.msgType == TDMT_VND_SUBMIT) { if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task; qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) { while (1) {
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
......
...@@ -304,7 +304,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -304,7 +304,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
break; break;
} }
SDiskCfg pDisks = {.level = 0, .primary = 1}; SDiskCfg pDisks = {0};
pDisks.level = 0;
pDisks.primary = 1;
strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN); strncpy(pDisks.dir, "/var/lib/taos", TSDB_FILENAME_LEN);
int32_t numOfDisks = 1; int32_t numOfDisks = 1;
pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks); pTsdb->pTfs = tfsOpen(&pDisks, numOfDisks);
......
...@@ -307,11 +307,10 @@ typedef struct STaskRuntimeEnv { ...@@ -307,11 +307,10 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
OP_IN_EXECUTING = 1, OP_NOT_OPENED = 0x0,
OP_RES_TO_RETURN = 2, OP_OPENED = 0x1,
OP_EXEC_DONE = 3, OP_RES_TO_RETURN = 0x5,
OP_OPENED = 4, OP_EXEC_DONE = 0x9,
OP_NOT_OPENED = 5
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -426,6 +425,8 @@ typedef struct STagScanInfo { ...@@ -426,6 +425,8 @@ typedef struct STagScanInfo {
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type
bool blockValid; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
...@@ -544,6 +545,7 @@ typedef struct SGroupbyOperatorInfo { ...@@ -544,6 +545,7 @@ typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t colIndex; int32_t colIndex;
char* prevData; // previous group by value char* prevData; // previous group by value
SGroupResInfo groupResInfo;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
...@@ -648,8 +650,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S ...@@ -648,8 +650,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S
SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput, bool multigroupResult); int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
SExprInfo* pExpr, int32_t numOfOutput); SExprInfo* pExpr, int32_t numOfOutput);
...@@ -672,7 +673,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -672,7 +673,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
int32_t numOfOutput); int32_t numOfOutput);
// SSDataBlock* doSLimit(void* param, bool* newgroup);
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
...@@ -696,11 +696,8 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); ...@@ -696,11 +696,8 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg);
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo* pQInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "planner.h" #include "planner.h"
#include "tq.h" #include "tq.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,18 +31,40 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id) ...@@ -31,18 +31,40 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, char* id)
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
return doSetStreamBlock(pOperator->pDownstream[0], input, id); return doSetStreamBlock(pOperator->pDownstream[0], input, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); // the block type can not be changed in the streamscan operators
if (pInfo->blockType == 0) {
pInfo->blockType = type;
} else if (pInfo->blockType != type) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else {
ASSERT(!pInfo->blockValid);
SSDataBlock* pDataBlock = input;
pInfo->pRes->info = pDataBlock->info;
for(int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) {
pInfo->pRes->pDataBlock = pDataBlock->pDataBlock;
}
// set current block valid.
pInfo->blockValid = true;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -53,7 +75,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) { ...@@ -53,7 +75,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -515,7 +515,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* ...@@ -515,7 +515,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
CHECK_OUT_OF_MEM(val); CHECK_OUT_OF_MEM(val);
val->literal = strndup(pLiteral->z, pLiteral->n); val->literal = strndup(pLiteral->z, pLiteral->n);
if (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType) { if (TK_NK_ID != pLiteral->type && (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) {
trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n); trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n);
} }
CHECK_OUT_OF_MEM(val->literal); CHECK_OUT_OF_MEM(val->literal);
......
...@@ -30,8 +30,14 @@ typedef struct STranslateContext { ...@@ -30,8 +30,14 @@ typedef struct STranslateContext {
ESqlClause currClause; ESqlClause currClause;
SSelectStmt* pCurrStmt; SSelectStmt* pCurrStmt;
SCmdMsgInfo* pCmdMsg; SCmdMsgInfo* pCmdMsg;
SHashObj* pDbs;
SHashObj* pTables;
} STranslateContext; } STranslateContext;
typedef struct SFullDatabaseName {
char fullDbName[TSDB_DB_FNAME_LEN];
} SFullDatabaseName;
static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode); static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode);
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode); static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode);
...@@ -78,64 +84,96 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName ...@@ -78,64 +84,96 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName
return pName; return pName;
} }
static int32_t getTableMetaImpl(SParseContext* pCxt, const SName* pName, STableMeta** pMeta) { static int32_t collectUseDatabase(const char* pFullDbName, SHashObj* pDbs) {
int32_t code = catalogGetTableMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pMeta); SFullDatabaseName name = {0};
strcpy(name.fullDbName, pFullDbName);
return taosHashPut(pDbs, pFullDbName, strlen(pFullDbName), &name, sizeof(SFullDatabaseName));
}
static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
return taosHashPut(pDbs, fullName, strlen(fullName), pName, sizeof(SName));
}
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
} }
return code; return code;
} }
static int32_t getTableMeta(SParseContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) { static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pDbName); strcpy(name.dbname, pDbName);
strcpy(name.tname, pTableName); strcpy(name.tname, pTableName);
return getTableMetaImpl(pCxt, &name, pMeta); return getTableMetaImpl(pCxt, &name, pMeta);
} }
static int32_t getTableDistVgInfo(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pVgInfo); SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
} }
return code; return code;
} }
static int32_t getDBVgInfoImpl(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
SParseContext* pParCxt = pCxt->pParseCxt;
char fullDbName[TSDB_DB_FNAME_LEN]; char fullDbName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pName, fullDbName); tNameGetFullDbName(pName, fullDbName);
int32_t code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, pVgInfo); int32_t code = collectUseDatabase(fullDbName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName); parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName);
} }
return code; return code;
} }
static int32_t getDBVgInfo(SParseContext* pCxt, const char* pDbName, SArray** pVgInfo) { static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray** pVgInfo) {
SName name; SName name;
tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname); tNameGetFullDbName(&name, dbFname);
return getDBVgInfoImpl(pCxt, &name, pVgInfo); return getDBVgInfoImpl(pCxt, &name, pVgInfo);
} }
static int32_t getTableHashVgroupImpl(SParseContext* pCxt, const SName* pName, SVgroupInfo* pInfo) { static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pInfo); SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseTable(pName, pCxt->pTables);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
} }
return code; return code;
} }
static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) { static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
strcpy(name.dbname, pDbName); strcpy(name.dbname, pDbName);
strcpy(name.tname, pTableName); strcpy(name.tname, pTableName);
return getTableHashVgroupImpl(pCxt, &name, pInfo); return getTableHashVgroupImpl(pCxt, &name, pInfo);
} }
static int32_t getDBVgVersion(SParseContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) { static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
int32_t code = catalogGetDBVgVersion(pCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum); SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
}
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName); parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName);
} }
...@@ -559,7 +597,7 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) { ...@@ -559,7 +597,7 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setSysTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) { static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
// todo release // todo release
// if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) { // if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
...@@ -586,8 +624,8 @@ static int32_t setSysTableVgroupList(SParseContext* pCxt, SName* pName, SRealTab ...@@ -586,8 +624,8 @@ static int32_t setSysTableVgroupList(SParseContext* pCxt, SName* pName, SRealTab
return code; return code;
} }
static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) { static int32_t setTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->topicQuery) { if (pCxt->pParseCxt->topicQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -618,12 +656,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { ...@@ -618,12 +656,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
case QUERY_NODE_REAL_TABLE: { case QUERY_NODE_REAL_TABLE: {
SRealTableNode* pRealTable = (SRealTableNode*)pTable; SRealTableNode* pRealTable = (SRealTableNode*)pTable;
SName name; SName name;
code = getTableMetaImpl(pCxt->pParseCxt, code = getTableMetaImpl(pCxt,
toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta)); toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
} }
code = setTableVgroupList(pCxt->pParseCxt, &name, pRealTable); code = setTableVgroupList(pCxt, &name, pRealTable);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
...@@ -1020,7 +1058,7 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt ...@@ -1020,7 +1058,7 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SName tableName; SName tableName;
int32_t code = getTableMetaImpl( int32_t code = getTableMetaImpl(
pCxt->pParseCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta); pCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (TSDB_SUPER_TABLE == pTableMeta->tableType) { if (TSDB_SUPER_TABLE == pTableMeta->tableType) {
code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists); code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists);
...@@ -1113,7 +1151,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p ...@@ -1113,7 +1151,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
SName name = {0}; SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameExtractFullName(&name, usedbReq.db); tNameExtractFullName(&name, usedbReq.db);
int32_t code = getDBVgVersion(pCxt->pParseCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); int32_t code = getDBVgVersion(pCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
...@@ -1319,7 +1357,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) { ...@@ -1319,7 +1357,7 @@ static int32_t translateShowTables(STranslateContext* pCxt) {
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL; SArray* array = NULL;
int32_t code = getDBVgInfo(pCxt->pParseCxt, pCxt->pParseCxt->db, &array); int32_t code = getDBVgInfo(pCxt, pCxt->pParseCxt->db, &array);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
...@@ -1663,6 +1701,9 @@ static void destroyTranslateContext(STranslateContext* pCxt) { ...@@ -1663,6 +1701,9 @@ static void destroyTranslateContext(STranslateContext* pCxt) {
tfree(pCxt->pCmdMsg->pMsg); tfree(pCxt->pCmdMsg->pMsg);
tfree(pCxt->pCmdMsg); tfree(pCxt->pCmdMsg);
} }
taosHashCleanup(pCxt->pDbs);
taosHashCleanup(pCxt->pTables);
} }
static const char* getSysTableName(ENodeType type) { static const char* getSysTableName(ENodeType type) {
...@@ -1938,7 +1979,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -1938,7 +1979,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot; SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
SVgroupInfo info = {0}; SVgroupInfo info = {0};
int32_t code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info); int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
SArray* pBufArray = NULL; SArray* pBufArray = NULL;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray); code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
...@@ -2078,7 +2119,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau ...@@ -2078,7 +2119,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) { static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
int32_t code = getTableMeta(pCxt->pParseCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta); int32_t code = getTableMeta(pCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta);
SKVRowBuilder kvRowBuilder = {0}; SKVRowBuilder kvRowBuilder = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2105,7 +2146,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla ...@@ -2105,7 +2146,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
SVgroupInfo info = {0}; SVgroupInfo info = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info); code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info); addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->dbName, pStmt->tableName, row, pSuperTableMeta->uid, &info);
...@@ -2227,6 +2268,31 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -2227,6 +2268,31 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->msgType = pQuery->pCmdMsg->msgType; pQuery->msgType = pQuery->pCmdMsg->msgType;
break; break;
} }
if (NULL != pCxt->pDbs) {
pQuery->pDbList = taosArrayInit(taosHashGetSize(pCxt->pDbs), TSDB_DB_FNAME_LEN);
if (NULL == pQuery->pDbList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SFullDatabaseName* pDb = taosHashIterate(pCxt->pDbs, NULL);
while (NULL != pDb) {
taosArrayPush(pQuery->pDbList, pDb->fullDbName);
pDb = taosHashIterate(pCxt->pDbs, pDb);
}
}
if (NULL != pCxt->pTables) {
pQuery->pTableList = taosArrayInit(taosHashGetSize(pCxt->pTables), sizeof(SName));
if (NULL == pQuery->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SName* pTable = taosHashIterate(pCxt->pTables, NULL);
while (NULL != pTable) {
taosArrayPush(pQuery->pTableList, pTable);
pTable = taosHashIterate(pCxt->pTables, pTable);
}
}
return code; return code;
} }
...@@ -2237,8 +2303,13 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) { ...@@ -2237,8 +2303,13 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) {
.msgBuf = { .buf = pParseCxt->pMsg, .len = pParseCxt->msgLen }, .msgBuf = { .buf = pParseCxt->pMsg, .len = pParseCxt->msgLen },
.pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES), .pNsLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES),
.currLevel = 0, .currLevel = 0,
.currClause = 0 .currClause = 0,
.pDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK),
.pTables = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)
}; };
if (NULL == cxt.pNsLevel) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = fmFuncMgtInit(); int32_t code = fmFuncMgtInit();
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteQuery(&cxt, pQuery); code = rewriteQuery(&cxt, pQuery);
......
...@@ -142,7 +142,6 @@ typedef struct SSyncNode { ...@@ -142,7 +142,6 @@ typedef struct SSyncNode {
SRaftId leaderCache; SRaftId leaderCache;
// life cycle // life cycle
int32_t refCount;
int64_t rid; int64_t rid;
// tla+ server vars // tla+ server vars
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "syncTimeout.h" #include "syncTimeout.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tref.h"
static int32_t tsNodeRefId = -1; static int32_t tsNodeRefId = -1;
...@@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); ...@@ -44,31 +45,57 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
// life cycle
static void syncFreeNode(void* param);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
int32_t ret = syncEnvStart(); int32_t ret;
tsNodeRefId = taosOpenRef(200, syncFreeNode);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
ret = -1;
} else {
ret = syncEnvStart();
}
return ret; return ret;
} }
void syncCleanUp() { void syncCleanUp() {
int32_t ret = syncEnvStop(); int32_t ret = syncEnvStop();
assert(ret == 0); assert(ret == 0);
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
} }
int64_t syncStart(const SSyncInfo* pSyncInfo) { int64_t syncStart(const SSyncInfo* pSyncInfo) {
int32_t ret = 0;
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
// todo : return ref id pSyncNode->rid = taosAddRef(tsNodeRefId, pSyncNode);
return ret; if (pSyncNode->rid < 0) {
syncFreeNode(pSyncNode);
return -1;
}
return pSyncNode->rid;
} }
void syncStop(int64_t rid) { void syncStop(int64_t rid) {
// todo : get pointer from rid SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
SSyncNode* pSyncNode = NULL; if (pSyncNode == NULL) {
return;
}
syncNodeClose(pSyncNode); syncNodeClose(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
taosRemoveRef(tsNodeRefId, rid);
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
...@@ -76,11 +103,16 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { ...@@ -76,11 +103,16 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
return ret; return ret;
} }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
// todo : get pointer from rid // todo : get pointer from rid
SSyncNode* pSyncNode = NULL; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -93,6 +125,13 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -93,6 +125,13 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !! ret = -1; // todo : need define err code !!
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret; return ret;
} }
...@@ -155,7 +194,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -155,7 +194,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
// init life cycle // init life cycle outside
// TLA+ Spec // TLA+ Spec
// InitHistoryVars == /\ elections = {} // InitHistoryVars == /\ elections = {}
...@@ -444,6 +483,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -444,6 +483,10 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache); cJSON* pLaderCache = syncUtilRaftId2Json(&pSyncNode->leaderCache);
cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache); cJSON_AddItemToObject(pRoot, "leaderCache", pLaderCache);
// life cycle
snprintf(u64buf, sizeof(u64buf), "%ld", pSyncNode->rid);
cJSON_AddStringToObject(pRoot, "rid", u64buf);
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state));
...@@ -813,3 +856,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -813,3 +856,10 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
return ret; return ret;
} }
static void syncFreeNode(void* param) {
SSyncNode* pNode = param;
syncNodePrint2((char*)"==syncFreeNode==", pNode);
free(pNode);
}
\ No newline at end of file
...@@ -31,6 +31,7 @@ add_executable(syncElectTest "") ...@@ -31,6 +31,7 @@ add_executable(syncElectTest "")
add_executable(syncEncodeTest "") add_executable(syncEncodeTest "")
add_executable(syncWriteTest "") add_executable(syncWriteTest "")
add_executable(syncReplicateTest "") add_executable(syncReplicateTest "")
add_executable(syncRefTest "")
target_sources(syncTest target_sources(syncTest
...@@ -165,6 +166,10 @@ target_sources(syncReplicateTest ...@@ -165,6 +166,10 @@ target_sources(syncReplicateTest
PRIVATE PRIVATE
"syncReplicateTest.cpp" "syncReplicateTest.cpp"
) )
target_sources(syncRefTest
PRIVATE
"syncRefTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -337,6 +342,11 @@ target_include_directories(syncReplicateTest ...@@ -337,6 +342,11 @@ target_include_directories(syncReplicateTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncRefTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -471,6 +481,10 @@ target_link_libraries(syncReplicateTest ...@@ -471,6 +481,10 @@ target_link_libraries(syncReplicateTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncRefTest
sync
gtest_main
)
enable_testing() enable_testing()
......
...@@ -116,9 +116,10 @@ int main(int argc, char** argv) { ...@@ -116,9 +116,10 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
while (1) { while (1) {
sTrace("elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
} }
return 0; return 0;
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "tref.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
static void syncFreeObj(void *param);
int32_t init();
void cleanup();
int64_t start();
void stop(int64_t rid);
static int32_t tsNodeRefId = -1;
int g = 100;
typedef struct SyncObj {
int64_t rid;
void * data;
char name[32];
int counter;
} SyncObj;
static void syncFreeObj(void *param) {
SyncObj *pObj = (SyncObj *)param;
printf("syncFreeObj name:%s rid:%ld \n", pObj->name, pObj->rid);
free(pObj);
}
int32_t init() {
tsNodeRefId = taosOpenRef(200, syncFreeObj);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
cleanup();
return -1;
}
return 0;
}
void cleanup() {
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
}
int64_t start() {
SyncObj *pObj = (SyncObj *)malloc(sizeof(SyncObj));
assert(pObj != NULL);
pObj->data = &g;
snprintf(pObj->name, sizeof(pObj->name), "%s", "hello");
pObj->rid = taosAddRef(tsNodeRefId, pObj);
if (pObj->rid < 0) {
syncFreeObj(pObj);
return -1;
}
printf("start name:%s rid:%ld \n", pObj->name, pObj->rid);
return pObj->rid;
}
void stop(int64_t rid) {
SyncObj *pObj = (SyncObj *)taosAcquireRef(tsNodeRefId, rid);
if (pObj == NULL) return;
printf("stop name:%s rid:%ld \n", pObj->name, pObj->rid);
pObj->data = NULL;
taosReleaseRef(tsNodeRefId, pObj->rid);
taosRemoveRef(tsNodeRefId, rid);
}
void *func(void *param) {
int64_t rid = (int64_t)param;
int32_t ms = taosRand() % 10000;
taosMsleep(ms);
SyncObj *pObj = (SyncObj *)taosAcquireRef(tsNodeRefId, rid);
if (pObj != NULL) {
printf("taosAcquireRef sleep:%d, name:%s, rid:%ld \n", ms, pObj->name, pObj->rid);
} else {
printf("taosAcquireRef sleep:%d, NULL! \n", ms);
}
taosReleaseRef(tsNodeRefId, rid);
return NULL;
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
taosSeedRand(taosGetTimestampSec());
int32_t ret;
ret = init();
assert(ret == 0);
int64_t rid = start();
assert(rid > 0);
for (int i = 0; i < 20; ++i) {
TdThread tid;
taosThreadCreate(&tid, NULL, func, (void *)rid);
}
int32_t ms = taosRand() % 10000;
taosMsleep(ms);
printf("main sleep %d, stop and clean ", ms);
stop(rid);
cleanup();
while (1) {
taosMsleep(1000);
printf("sleep 1 ... \n");
}
return 0;
}
...@@ -172,15 +172,19 @@ int main(int argc, char **argv) { ...@@ -172,15 +172,19 @@ int main(int argc, char **argv) {
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); "electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
} }
while (1) { while (1) {
sTrace("replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", sTrace(
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, gSyncNode->electTimerLogicClock, "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); "electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm,
gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -333,7 +333,10 @@ TEST_F(TransEnv, cliPersistHandle) { ...@@ -333,7 +333,10 @@ TEST_F(TransEnv, cliPersistHandle) {
SRpcMsg resp = {0}; SRpcMsg resp = {0};
void * handle = NULL; void * handle = NULL;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; SRpcMsg req = {0};
req.handle = resp.handle;
req.persistHandle = 1;
req.msgType = 1; req.msgType = 1;
req.pCont = rpcMallocCont(10); req.pCont = rpcMallocCont(10);
req.contLen = 10; req.contLen = 10;
......
...@@ -29,61 +29,6 @@ ...@@ -29,61 +29,6 @@
struct termios oldtio; struct termios oldtio;
#endif #endif
int32_t taosSystem(const char *cmd, char *buf, int32_t bufSize) {
#if defined(WINDOWS)
FILE *fp;
if (cmd == NULL) {
// printf("taosSystem cmd is NULL!");
return -1;
}
if ((fp = _popen(cmd, "r")) == NULL) {
// printf("popen cmd:%s error: %s", cmd, strerror(errno));
return -1;
} else {
while (fgets(buf, bufSize, fp)) {
// printf("popen result:%s", buf);
}
if (!_pclose(fp)) {
// printf("close popen file pointer fp error!");
return -1;
} else {
// printf("popen res is :%d", res);
}
return 0;
}
#elif defined(_TD_DARWIN_64)
printf("no support funtion");
return -1;
#else
FILE *fp;
int32_t res;
if (cmd == NULL) {
// printf("taosSystem cmd is NULL!");
return -1;
}
if ((fp = popen(cmd, "r")) == NULL) {
// printf("popen cmd:%s error: %s", cmd, strerror(errno));
return -1;
} else {
while (fgets(buf, bufSize, fp)) {
// printf("popen result:%s", buf);
}
if ((res = pclose(fp)) == -1) {
// printf("close popen file pointer fp error!");
} else {
// printf("popen res is :%d", res);
}
return res;
}
#endif
}
void* taosLoadDll(const char* filename) { void* taosLoadDll(const char* filename) {
#if defined(WINDOWS) #if defined(WINDOWS)
return NULL; return NULL;
...@@ -103,7 +48,7 @@ void* taosLoadDll(const char* filename) { ...@@ -103,7 +48,7 @@ void* taosLoadDll(const char* filename) {
} }
void* taosLoadSym(void* handle, char* name) { void* taosLoadSym(void* handle, char* name) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(WINDOWS)
return NULL; return NULL;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
return NULL; return NULL;
...@@ -123,7 +68,7 @@ void* taosLoadSym(void* handle, char* name) { ...@@ -123,7 +68,7 @@ void* taosLoadSym(void* handle, char* name) {
} }
void taosCloseDll(void* handle) { void taosCloseDll(void* handle) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(WINDOWS)
return; return;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
return; return;
...@@ -135,7 +80,7 @@ void taosCloseDll(void* handle) { ...@@ -135,7 +80,7 @@ void taosCloseDll(void* handle) {
} }
int taosSetConsoleEcho(bool on) { int taosSetConsoleEcho(bool on) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(WINDOWS)
HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE); HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE);
DWORD mode = 0; DWORD mode = 0;
GetConsoleMode(hStdin, &mode); GetConsoleMode(hStdin, &mode);
...@@ -146,28 +91,6 @@ int taosSetConsoleEcho(bool on) { ...@@ -146,28 +91,6 @@ int taosSetConsoleEcho(bool on) {
} }
SetConsoleMode(hStdin, mode); SetConsoleMode(hStdin, mode);
return 0;
#elif defined(_TD_DARWIN_64)
#define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL)
int err;
struct termios term;
if (tcgetattr(STDIN_FILENO, &term) == -1) {
perror("Cannot get the attribution of the terminal");
return -1;
}
if (on)
term.c_lflag |= ECHOFLAGS;
else
term.c_lflag &= ~ECHOFLAGS;
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 && err == EINTR) {
perror("Cannot set the attribution of the terminal");
return -1;
}
return 0; return 0;
#else #else
#define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL) #define ECHOFLAGS (ECHO | ECHOE | ECHOK | ECHONL)
...@@ -186,7 +109,7 @@ int taosSetConsoleEcho(bool on) { ...@@ -186,7 +109,7 @@ int taosSetConsoleEcho(bool on) {
err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term); err = tcsetattr(STDIN_FILENO, TCSAFLUSH, &term);
if (err == -1 || err == EINTR) { if (err == -1 || err == EINTR) {
//printf("Cannot set the attribution of the terminal"); printf("Cannot set the attribution of the terminal");
return -1; return -1;
} }
...@@ -195,35 +118,8 @@ int taosSetConsoleEcho(bool on) { ...@@ -195,35 +118,8 @@ int taosSetConsoleEcho(bool on) {
} }
void setTerminalMode() { void setTerminalMode() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(WINDOWS)
#elif defined(_TD_DARWIN_64)
struct termios newtio;
/* if (atexit() != 0) { */
/* fprintf(stderr, "Error register exit function!\n"); */
/* exit(EXIT_FAILURE); */
/* } */
memcpy(&newtio, &oldtio, sizeof(oldtio));
// Set new terminal attributes.
newtio.c_iflag &= ~(IXON | IXOFF | ICRNL | INLCR | IGNCR | IMAXBEL | ISTRIP);
newtio.c_iflag |= IGNBRK;
// newtio.c_oflag &= ~(OPOST|ONLCR|OCRNL|ONLRET);
newtio.c_oflag |= OPOST;
newtio.c_oflag |= ONLCR;
newtio.c_oflag &= ~(OCRNL | ONLRET);
newtio.c_lflag &= ~(IEXTEN | ICANON | ECHO | ECHOE | ECHONL | ECHOCTL | ECHOPRT | ECHOKE | ISIG);
newtio.c_cc[VMIN] = 1;
newtio.c_cc[VTIME] = 0;
if (tcsetattr(0, TCSANOW, &newtio) != 0) {
fprintf(stderr, "Fail to set terminal properties!\n");
exit(EXIT_FAILURE);
}
#else #else
struct termios newtio; struct termios newtio;
...@@ -256,19 +152,7 @@ void setTerminalMode() { ...@@ -256,19 +152,7 @@ void setTerminalMode() {
int32_t getOldTerminalMode() { int32_t getOldTerminalMode() {
#if defined(WINDOWS) #if defined(WINDOWS)
#elif defined(_TD_DARWIN_64)
/* Make sure stdin is a terminal. */
if (!isatty(STDIN_FILENO)) {
return -1;
}
// Get the parameter of current terminal
if (tcgetattr(0, &oldtio) != 0) {
return -1;
}
return 1;
#else #else
/* Make sure stdin is a terminal. */ /* Make sure stdin is a terminal. */
if (!isatty(STDIN_FILENO)) { if (!isatty(STDIN_FILENO)) {
...@@ -285,13 +169,8 @@ int32_t getOldTerminalMode() { ...@@ -285,13 +169,8 @@ int32_t getOldTerminalMode() {
} }
void resetTerminalMode() { void resetTerminalMode() {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(WINDOWS)
#elif defined(_TD_DARWIN_64)
if (tcsetattr(0, TCSANOW, &oldtio) != 0) {
fprintf(stderr, "Fail to reset the terminal properties!\n");
exit(EXIT_FAILURE);
}
#else #else
if (tcsetattr(0, TCSANOW, &oldtio) != 0) { if (tcsetattr(0, TCSANOW, &oldtio) != 0) {
fprintf(stderr, "Fail to reset the terminal properties!\n"); fprintf(stderr, "Fail to reset the terminal properties!\n");
......
...@@ -305,7 +305,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) { ...@@ -305,7 +305,7 @@ int32_t taosHashGetSize(const SHashObj *pHashObj) {
return (int32_t)atomic_load_64(&pHashObj->size); return (int32_t)atomic_load_64(&pHashObj->size);
} }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const void *data, size_t size) {
if (pHashObj == NULL || key == NULL || keyLen == 0) { if (pHashObj == NULL || key == NULL || keyLen == 0) {
return -1; return -1;
} }
......
...@@ -39,10 +39,11 @@ endi ...@@ -39,10 +39,11 @@ endi
print =============== drop database print =============== drop database
sql drop database d1 sql drop database d1
sql show databases # todo release
if $rows != 1 then #sql show databases
return -1 #if $rows != 1 then
endi # return -1
#endi
print =============== more databases print =============== more databases
sql create database d2 vgroups 2 sql create database d2 vgroups 2
......
...@@ -20,7 +20,7 @@ sql show databases ...@@ -20,7 +20,7 @@ sql show databases
print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1 print $data0_1 $data1_1 $data2_1 $data3_1 $data4_1 $data5_1 $data6_1 $data7_1 $data8_1 $data9_1
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != $db then if $data00 != $db then
...@@ -52,16 +52,17 @@ print =============== step2 ...@@ -52,16 +52,17 @@ print =============== step2
sql_error create database $db sql_error create database $db
sql create database if not exists $db sql create database if not exists $db
sql show databases sql show databases
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
print =============== step3 print =============== step3
sql drop database $db sql drop database $db
sql show databases # todo release
if $rows != 0 then #sql show databases
return -1 #if $rows != 1 then
endi # return -1
#endi
print =============== step4 print =============== step4
sql_error drop database $db sql_error drop database $db
......
...@@ -16,11 +16,17 @@ create1: ...@@ -16,11 +16,17 @@ create1:
return -1 return -1
endi endi
# todo remove
sql create database useless_db
sql show dnodes sql show dnodes
if $data4_2 != ready then if $data4_2 != ready then
goto create1 goto create1
endi endi
# todo remove
sql drop database useless_db
print ========== stop dnode2 print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL system sh/exec.sh -n dnode2 -s stop -x SIGKILL
...@@ -42,7 +48,7 @@ re-create1: ...@@ -42,7 +48,7 @@ re-create1:
sql create database d1 vgroups 2 -x re-create1 sql create database d1 vgroups 2 -x re-create1
sql show databases sql show databases
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
...@@ -80,7 +86,7 @@ re-create2: ...@@ -80,7 +86,7 @@ re-create2:
sql create database d1 vgroups 5 -x re-create2 sql create database d1 vgroups 5 -x re-create2
sql show databases sql show databases
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
......
...@@ -5,6 +5,9 @@ system sh/exec.sh -n dnode1 -s start ...@@ -5,6 +5,9 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
sql connect sql connect
# todo remove
sql create database useless_db
print =============== show dnodes print =============== show dnodes
sql show dnodes; sql show dnodes;
if $rows != 1 then if $rows != 1 then
...@@ -15,9 +18,10 @@ if $data00 != 1 then ...@@ -15,9 +18,10 @@ if $data00 != 1 then
return -1 return -1
endi endi
if $data02 != 0 then # check 'vnodes' feild ?
return -1 #if $data02 != 0 then
endi # return -1
#endi
sql show mnodes; sql show mnodes;
if $rows != 1 then if $rows != 1 then
...@@ -49,9 +53,10 @@ if $data10 != 2 then ...@@ -49,9 +53,10 @@ if $data10 != 2 then
return -1 return -1
endi endi
if $data02 != 0 then # check 'vnodes' feild ?
return -1 #if $data02 != 0 then
endi # return -1
#endi
if $data12 != 0 then if $data12 != 0 then
return -1 return -1
...@@ -78,12 +83,15 @@ if $data02 != master then ...@@ -78,12 +83,15 @@ if $data02 != master then
return -1 return -1
endi endi
# todo remove
sql drop database useless_db
print =============== create database print =============== create database
sql create database d1 vgroups 4; sql create database d1 vgroups 4;
sql create database d2; sql create database d2;
sql show databases sql show databases
if $rows != 2 then if $rows != 3 then
return -1 return -1
endi endi
......
...@@ -7,7 +7,7 @@ sql connect ...@@ -7,7 +7,7 @@ sql connect
print =============== create database print =============== create database
sql create database d0 sql create database d0
sql show databases sql show databases
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
...@@ -52,6 +52,8 @@ sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0) ...@@ -52,6 +52,8 @@ sql insert into ct3 values('2021-01-01 00:00:00.000', 10, 2.0, 3.0)
#=================================================================== #===================================================================
print =============== query data from child table print =============== query data from child table
sql select * from ct1 sql select * from ct1
print ========> value is : $data00
if $rows != 4 then # after fix bug, modify 4 to 7 if $rows != 4 then # after fix bug, modify 4 to 7
return -1 return -1
endi endi
......
...@@ -6,7 +6,7 @@ sql connect ...@@ -6,7 +6,7 @@ sql connect
print =============== create database print =============== create database
sql create database d1 sql create database d1
sql show databases sql show databases
if $rows != 1 then if $rows != 2 then
return -1 return -1
endi endi
......
...@@ -3,6 +3,9 @@ system sh/deploy.sh -n dnode1 -i 1 ...@@ -3,6 +3,9 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sql connect sql connect
# todo remove
sql create database useless_db
print =============== show users print =============== show users
sql show users sql show users
if $rows != 1 then if $rows != 1 then
...@@ -71,4 +74,7 @@ print $data10 $data11 $data22 ...@@ -71,4 +74,7 @@ print $data10 $data11 $data22
print $data20 $data11 $data22 print $data20 $data11 $data22
print $data30 $data31 $data32 print $data30 $data31 $data32
# todo remove
sql drop database useless_db
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define __USE_XOPEN
#include "os.h"
#include "tglobal.h"
#include "shell.h"
#include "shellCommand.h"
#include "tbase64.h"
#include "tlog.h"
#include "version.h"
#include <wordexp.h>
#include <argp.h>
#include <termio.h>
#define OPT_ABORT 1 /* �Cabort */
int indicator = 1;
struct termios oldtio;
void insertChar(Command *cmd, char *c, int size);
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
static char doc[] = "";
static char args_doc[] = "";
static struct argp_option options[] = {
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
{"password", 'p', 0, 0, "The password to use when connecting to the server."},
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
{"dump-config", 'C', 0, 0, "Dump configuration."},
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
{"raw-time", 'r', 0, 0, "Output time as uint64_t."},
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"check", 'k', "CHECK", 0, "Check tables."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
SShellArguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
case 'h':
arguments->host = arg;
break;
case 'p':
break;
case 'P':
if (arg) {
arguments->port = atoi(arg);
} else {
fprintf(stderr, "Invalid port\n");
return -1;
}
break;
case 'z':
arguments->timezone = arg;
break;
case 'u':
arguments->user = arg;
break;
case 'A':
arguments->auth = arg;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
if (strlen(full_path.we_wordv[0]) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1);
wordfree(&full_path);
return -1;
}
tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'C':
arguments->dump_config = true;
break;
case 's':
arguments->commands = arg;
break;
case 'r':
arguments->is_raw_time = true;
break;
case 'f':
if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->file, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'D':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->dir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'T':
if (arg) {
arguments->threadNum = atoi(arg);
} else {
fprintf(stderr, "Invalid number of threads\n");
return -1;
}
break;
case 'k':
arguments->check = atoi(arg);
break;
case 'd':
arguments->database = arg;
break;
case 'n':
arguments->netTestRole = arg;
break;
case 'l':
if (arg) {
arguments->pktLen = atoi(arg);
} else {
fprintf(stderr, "Invalid packet length\n");
return -1;
}
break;
case 'N':
if (arg) {
arguments->pktNum = atoi(arg);
} else {
fprintf(stderr, "Invalid packet number\n");
return -1;
}
break;
case 'S':
arguments->pktType = arg;
break;
case OPT_ABORT:
arguments->abort = 1;
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char g_password[SHELL_MAX_PASSWORD_LEN];
static void parse_args(
int argc, char *argv[], SShellArguments *arguments) {
for (int i = 1; i < argc; i++) {
if ((strncmp(argv[i], "-p", 2) == 0)
|| (strncmp(argv[i], "--password", 10) == 0)) {
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
if ((strlen(argv[i]) == 2)
|| (strncmp(argv[i], "--password", 10) == 0)) {
printf("Enter password: ");
taosSetConsoleEcho(false);
if (scanf("%20s", g_password) > 1) {
fprintf(stderr, "password reading error\n");
}
taosSetConsoleEcho(true);
if (EOF == getchar()) {
fprintf(stderr, "getchar() return EOF\n");
}
} else {
tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN);
strcpy(argv[i], "-p");
}
arguments->password = g_password;
arguments->is_use_passwd = true;
}
}
}
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
if (argc > 1) {
parse_args(argc, argv, arguments);
}
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (arguments->abort) {
#ifndef _ALPINE
#if 0
error(10, 0, "ABORTED");
#endif
#else
abort();
#endif
}
}
int32_t shellReadCommand(TAOS *con, char *command) {
unsigned hist_counter = history.hend;
char utf8_array[10] = "\0";
Command cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.buffer = (char *)calloc(1, MAX_COMMAND_SIZE);
cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE);
showOnScreen(&cmd);
// Read input.
char c;
while (1) {
c = (char)getchar(); // getchar() return an 'int' value
if (c == EOF) {
return c;
}
if (c < 0) { // For UTF-8
int count = countPrefixOnes(c);
utf8_array[0] = c;
for (int k = 1; k < count; k++) {
c = (char)getchar();
utf8_array[k] = c;
}
insertChar(&cmd, utf8_array, count);
} else if (c < '\033') {
// Ctrl keys. TODO: Implement ctrl combinations
switch (c) {
case 1: // ctrl A
positionCursorHome(&cmd);
break;
case 3:
printf("\n");
resetCommand(&cmd, "");
kill(0, SIGINT);
break;
case 4: // EOF or Ctrl+D
printf("\n");
taos_close(con);
// write the history
write_history();
exitShell();
break;
case 5: // ctrl E
positionCursorEnd(&cmd);
break;
case 8:
backspaceChar(&cmd);
break;
case '\n':
case '\r':
printf("\n");
if (isReadyGo(&cmd)) {
sprintf(command, "%s%s", cmd.buffer, cmd.command);
tfree(cmd.buffer);
tfree(cmd.command);
return 0;
} else {
updateBuffer(&cmd);
}
break;
case 11: // Ctrl + K;
clearLineAfter(&cmd);
break;
case 12: // Ctrl + L;
system("clear");
showOnScreen(&cmd);
break;
case 21: // Ctrl + U;
clearLineBefore(&cmd);
break;
}
} else if (c == '\033') {
c = (char)getchar();
switch (c) {
case '[':
c = (char)getchar();
switch (c) {
case 'A': // Up arrow
if (hist_counter != history.hstart) {
hist_counter = (hist_counter + MAX_HISTORY_SIZE - 1) % MAX_HISTORY_SIZE;
resetCommand(&cmd, (history.hist[hist_counter] == NULL) ? "" : history.hist[hist_counter]);
}
break;
case 'B': // Down arrow
if (hist_counter != history.hend) {
int next_hist = (hist_counter + 1) % MAX_HISTORY_SIZE;
if (next_hist != history.hend) {
resetCommand(&cmd, (history.hist[next_hist] == NULL) ? "" : history.hist[next_hist]);
} else {
resetCommand(&cmd, "");
}
hist_counter = next_hist;
}
break;
case 'C': // Right arrow
moveCursorRight(&cmd);
break;
case 'D': // Left arrow
moveCursorLeft(&cmd);
break;
case '1':
if ((c = (char)getchar()) == '~') {
// Home key
positionCursorHome(&cmd);
}
break;
case '2':
if ((c = (char)getchar()) == '~') {
// Insert key
}
break;
case '3':
if ((c = (char)getchar()) == '~') {
// Delete key
deleteChar(&cmd);
}
break;
case '4':
if ((c = (char)getchar()) == '~') {
// End key
positionCursorEnd(&cmd);
}
break;
case '5':
if ((c = (char)getchar()) == '~') {
// Page up key
}
break;
case '6':
if ((c = (char)getchar()) == '~') {
// Page down key
}
break;
case 72:
// Home key
positionCursorHome(&cmd);
break;
case 70:
// End key
positionCursorEnd(&cmd);
break;
}
break;
}
} else if (c == 0x7f) {
// press delete key
backspaceChar(&cmd);
} else {
insertChar(&cmd, &c, 1);
}
}
return 0;
}
void *shellLoopQuery(void *arg) {
if (indicator) {
getOldTerminalMode();
indicator = 0;
}
TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
taosThreadCleanupPush(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE);
if (command == NULL){
uError("failed to malloc command");
return NULL;
}
int32_t err = 0;
do {
// Read command from shell.
memset(command, 0, MAX_COMMAND_SIZE);
setTerminalMode();
err = shellReadCommand(con, command);
if (err) {
break;
}
resetTerminalMode();
} while (shellRunCommand(con, command) == 0);
tfree(command);
exitShell();
taosThreadCleanupPop(1);
return NULL;
}
void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "%s/%s", getenv("HOME"), HISTORY_FILE); }
void clearScreen(int ecmd_pos, int cursor_pos) {
struct winsize w;
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
//fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n");
w.ws_col = 120;
w.ws_row = 30;
}
int cursor_x = cursor_pos / w.ws_col;
int cursor_y = cursor_pos % w.ws_col;
int command_x = ecmd_pos / w.ws_col;
positionCursor(cursor_y, LEFT);
positionCursor(command_x - cursor_x, DOWN);
fprintf(stdout, "\033[2K");
for (int i = 0; i < command_x; i++) {
positionCursor(1, UP);
fprintf(stdout, "\033[2K");
}
fflush(stdout);
}
void showOnScreen(Command *cmd) {
struct winsize w;
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
//fprintf(stderr, "No stream device\n");
w.ws_col = 120;
w.ws_row = 30;
}
TdWchar wc;
int size = 0;
// Print out the command.
char *total_string = malloc(MAX_COMMAND_SIZE);
memset(total_string, '\0', MAX_COMMAND_SIZE);
if (strcmp(cmd->buffer, "") == 0) {
sprintf(total_string, "%s%s", PROMPT_HEADER, cmd->command);
} else {
sprintf(total_string, "%s%s", CONTINUE_PROMPT, cmd->command);
}
int remain_column = w.ws_col;
/* size = cmd->commandSize + prompt_size; */
for (char *str = total_string; size < cmd->commandSize + prompt_size;) {
int ret = taosMbToWchar(&wc, str, MB_CUR_MAX);
if (ret < 0) break;
size += ret;
/* assert(size >= 0); */
int width = taosWcharWidth(wc);
if (remain_column > width) {
printf("%lc", wc);
remain_column -= width;
} else {
if (remain_column == width) {
printf("%lc\n\r", wc);
remain_column = w.ws_col;
} else {
printf("\n\r%lc", wc);
remain_column = w.ws_col - width;
}
}
str = total_string + size;
}
free(total_string);
/* for (int i = 0; i < size; i++){ */
/* char c = total_string[i]; */
/* if (k % w.ws_col == 0) { */
/* printf("%c\n\r", c); */
/* } */
/* else { */
/* printf("%c", c); */
/* } */
/* k += 1; */
/* } */
// Position the cursor
int cursor_pos = cmd->screenOffset + prompt_size;
int ecmd_pos = cmd->endOffset + prompt_size;
int cursor_x = cursor_pos / w.ws_col;
int cursor_y = cursor_pos % w.ws_col;
// int cursor_y = cursor % w.ws_col;
int command_x = ecmd_pos / w.ws_col;
int command_y = ecmd_pos % w.ws_col;
// int command_y = (command.size() + prompt_size) % w.ws_col;
positionCursor(command_y, LEFT);
positionCursor(command_x, UP);
positionCursor(cursor_x, DOWN);
positionCursor(cursor_y, RIGHT);
fflush(stdout);
}
void cleanup_handler(void *arg) { resetTerminalMode(); }
void exitShell() {
/*int32_t ret =*/ resetTerminalMode();
taos_cleanup();
exit(EXIT_SUCCESS);
}
...@@ -13,13 +13,524 @@ ...@@ -13,13 +13,524 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define __USE_XOPEN
#include "os.h" #include "os.h"
#include "shell.h" #include "shell.h"
#include "tglobal.h" #include "tglobal.h"
#include "shellCommand.h"
#include "tbase64.h"
#include "tlog.h"
#include "version.h"
#include <wordexp.h>
#include <argp.h>
#include <termio.h>
#define OPT_ABORT 1 /* abort */
int indicator = 1;
void insertChar(Command *cmd, char *c, int size);
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
static char doc[] = "";
static char args_doc[] = "";
TdThread pid; TdThread pid;
static tsem_t cancelSem; static tsem_t cancelSem;
static struct argp_option options[] = {
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
{"password", 'p', 0, 0, "The password to use when connecting to the server."},
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
{"dump-config", 'C', 0, 0, "Dump configuration."},
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
{"raw-time", 'r', 0, 0, "Output time as uint64_t."},
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
{"check", 'k', "CHECK", 0, "Check tables."},
{"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."},
{"timezone", 'z', "TIMEZONE", 0, "Time zone of the shell, default is local."},
{"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync|speen|fqdn."},
{"pktlen", 'l', "PKTLEN", 0, "Packet length used for net test, default is 1000 bytes."},
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Packet type used for net test, default is TCP."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
SShellArguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
case 'h':
arguments->host = arg;
break;
case 'p':
break;
case 'P':
if (arg) {
arguments->port = atoi(arg);
} else {
fprintf(stderr, "Invalid port\n");
return -1;
}
break;
case 'z':
arguments->timezone = arg;
break;
case 'u':
arguments->user = arg;
break;
case 'A':
arguments->auth = arg;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
if (strlen(full_path.we_wordv[0]) >= TSDB_FILENAME_LEN) {
fprintf(stderr, "config file path: %s overflow max len %d\n", full_path.we_wordv[0], TSDB_FILENAME_LEN - 1);
wordfree(&full_path);
return -1;
}
tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'C':
arguments->dump_config = true;
break;
case 's':
arguments->commands = arg;
break;
case 'r':
arguments->is_raw_time = true;
break;
case 'f':
if ((0 == strlen(arg)) || (wordexp(arg, &full_path, 0) != 0)) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->file, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'D':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->dir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
wordfree(&full_path);
break;
case 'T':
if (arg) {
arguments->threadNum = atoi(arg);
} else {
fprintf(stderr, "Invalid number of threads\n");
return -1;
}
break;
case 'k':
arguments->check = atoi(arg);
break;
case 'd':
arguments->database = arg;
break;
case 'n':
arguments->netTestRole = arg;
break;
case 'l':
if (arg) {
arguments->pktLen = atoi(arg);
} else {
fprintf(stderr, "Invalid packet length\n");
return -1;
}
break;
case 'N':
if (arg) {
arguments->pktNum = atoi(arg);
} else {
fprintf(stderr, "Invalid packet number\n");
return -1;
}
break;
case 'S':
arguments->pktType = arg;
break;
case OPT_ABORT:
arguments->abort = 1;
break;
default:
return ARGP_ERR_UNKNOWN;
}
return 0;
}
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
char g_password[SHELL_MAX_PASSWORD_LEN];
static void parse_args(
int argc, char *argv[], SShellArguments *arguments) {
for (int i = 1; i < argc; i++) {
if ((strncmp(argv[i], "-p", 2) == 0)
|| (strncmp(argv[i], "--password", 10) == 0)) {
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
if ((strlen(argv[i]) == 2)
|| (strncmp(argv[i], "--password", 10) == 0)) {
printf("Enter password: ");
taosSetConsoleEcho(false);
if (scanf("%20s", g_password) > 1) {
fprintf(stderr, "password reading error\n");
}
taosSetConsoleEcho(true);
if (EOF == getchar()) {
fprintf(stderr, "getchar() return EOF\n");
}
} else {
tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN);
strcpy(argv[i], "-p");
}
arguments->password = g_password;
arguments->is_use_passwd = true;
}
}
}
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
if (argc > 1) {
parse_args(argc, argv, arguments);
}
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (arguments->abort) {
#ifndef _ALPINE
#if 0
error(10, 0, "ABORTED");
#endif
#else
abort();
#endif
}
}
int32_t shellReadCommand(TAOS *con, char *command) {
unsigned hist_counter = history.hend;
char utf8_array[10] = "\0";
Command cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.buffer = (char *)calloc(1, MAX_COMMAND_SIZE);
cmd.command = (char *)calloc(1, MAX_COMMAND_SIZE);
showOnScreen(&cmd);
// Read input.
char c;
while (1) {
c = (char)getchar(); // getchar() return an 'int' value
if (c == EOF) {
return c;
}
if (c < 0) { // For UTF-8
int count = countPrefixOnes(c);
utf8_array[0] = c;
for (int k = 1; k < count; k++) {
c = (char)getchar();
utf8_array[k] = c;
}
insertChar(&cmd, utf8_array, count);
} else if (c < '\033') {
// Ctrl keys. TODO: Implement ctrl combinations
switch (c) {
case 1: // ctrl A
positionCursorHome(&cmd);
break;
case 3:
printf("\n");
resetCommand(&cmd, "");
kill(0, SIGINT);
break;
case 4: // EOF or Ctrl+D
printf("\n");
taos_close(con);
// write the history
write_history();
exitShell();
break;
case 5: // ctrl E
positionCursorEnd(&cmd);
break;
case 8:
backspaceChar(&cmd);
break;
case '\n':
case '\r':
printf("\n");
if (isReadyGo(&cmd)) {
sprintf(command, "%s%s", cmd.buffer, cmd.command);
tfree(cmd.buffer);
tfree(cmd.command);
return 0;
} else {
updateBuffer(&cmd);
}
break;
case 11: // Ctrl + K;
clearLineAfter(&cmd);
break;
case 12: // Ctrl + L;
system("clear");
showOnScreen(&cmd);
break;
case 21: // Ctrl + U;
clearLineBefore(&cmd);
break;
}
} else if (c == '\033') {
c = (char)getchar();
switch (c) {
case '[':
c = (char)getchar();
switch (c) {
case 'A': // Up arrow
if (hist_counter != history.hstart) {
hist_counter = (hist_counter + MAX_HISTORY_SIZE - 1) % MAX_HISTORY_SIZE;
resetCommand(&cmd, (history.hist[hist_counter] == NULL) ? "" : history.hist[hist_counter]);
}
break;
case 'B': // Down arrow
if (hist_counter != history.hend) {
int next_hist = (hist_counter + 1) % MAX_HISTORY_SIZE;
if (next_hist != history.hend) {
resetCommand(&cmd, (history.hist[next_hist] == NULL) ? "" : history.hist[next_hist]);
} else {
resetCommand(&cmd, "");
}
hist_counter = next_hist;
}
break;
case 'C': // Right arrow
moveCursorRight(&cmd);
break;
case 'D': // Left arrow
moveCursorLeft(&cmd);
break;
case '1':
if ((c = (char)getchar()) == '~') {
// Home key
positionCursorHome(&cmd);
}
break;
case '2':
if ((c = (char)getchar()) == '~') {
// Insert key
}
break;
case '3':
if ((c = (char)getchar()) == '~') {
// Delete key
deleteChar(&cmd);
}
break;
case '4':
if ((c = (char)getchar()) == '~') {
// End key
positionCursorEnd(&cmd);
}
break;
case '5':
if ((c = (char)getchar()) == '~') {
// Page up key
}
break;
case '6':
if ((c = (char)getchar()) == '~') {
// Page down key
}
break;
case 72:
// Home key
positionCursorHome(&cmd);
break;
case 70:
// End key
positionCursorEnd(&cmd);
break;
}
break;
}
} else if (c == 0x7f) {
// press delete key
backspaceChar(&cmd);
} else {
insertChar(&cmd, &c, 1);
}
}
return 0;
}
void *shellLoopQuery(void *arg) {
if (indicator) {
getOldTerminalMode();
indicator = 0;
}
TAOS *con = (TAOS *)arg;
setThreadName("shellLoopQuery");
taosThreadCleanupPush(cleanup_handler, NULL);
char *command = malloc(MAX_COMMAND_SIZE);
if (command == NULL){
uError("failed to malloc command");
return NULL;
}
int32_t err = 0;
do {
// Read command from shell.
memset(command, 0, MAX_COMMAND_SIZE);
setTerminalMode();
err = shellReadCommand(con, command);
if (err) {
break;
}
resetTerminalMode();
} while (shellRunCommand(con, command) == 0);
tfree(command);
exitShell();
taosThreadCleanupPop(1);
return NULL;
}
void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "%s/%s", getenv("HOME"), HISTORY_FILE); }
void clearScreen(int ecmd_pos, int cursor_pos) {
struct winsize w;
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
//fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n");
w.ws_col = 120;
w.ws_row = 30;
}
int cursor_x = cursor_pos / w.ws_col;
int cursor_y = cursor_pos % w.ws_col;
int command_x = ecmd_pos / w.ws_col;
positionCursor(cursor_y, LEFT);
positionCursor(command_x - cursor_x, DOWN);
fprintf(stdout, "\033[2K");
for (int i = 0; i < command_x; i++) {
positionCursor(1, UP);
fprintf(stdout, "\033[2K");
}
fflush(stdout);
}
void showOnScreen(Command *cmd) {
struct winsize w;
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
//fprintf(stderr, "No stream device\n");
w.ws_col = 120;
w.ws_row = 30;
}
TdWchar wc;
int size = 0;
// Print out the command.
char *total_string = malloc(MAX_COMMAND_SIZE);
memset(total_string, '\0', MAX_COMMAND_SIZE);
if (strcmp(cmd->buffer, "") == 0) {
sprintf(total_string, "%s%s", PROMPT_HEADER, cmd->command);
} else {
sprintf(total_string, "%s%s", CONTINUE_PROMPT, cmd->command);
}
int remain_column = w.ws_col;
/* size = cmd->commandSize + prompt_size; */
for (char *str = total_string; size < cmd->commandSize + prompt_size;) {
int ret = taosMbToWchar(&wc, str, MB_CUR_MAX);
if (ret < 0) break;
size += ret;
/* assert(size >= 0); */
int width = taosWcharWidth(wc);
if (remain_column > width) {
printf("%lc", wc);
remain_column -= width;
} else {
if (remain_column == width) {
printf("%lc\n\r", wc);
remain_column = w.ws_col;
} else {
printf("\n\r%lc", wc);
remain_column = w.ws_col - width;
}
}
str = total_string + size;
}
free(total_string);
/* for (int i = 0; i < size; i++){ */
/* char c = total_string[i]; */
/* if (k % w.ws_col == 0) { */
/* printf("%c\n\r", c); */
/* } */
/* else { */
/* printf("%c", c); */
/* } */
/* k += 1; */
/* } */
// Position the cursor
int cursor_pos = cmd->screenOffset + prompt_size;
int ecmd_pos = cmd->endOffset + prompt_size;
int cursor_x = cursor_pos / w.ws_col;
int cursor_y = cursor_pos % w.ws_col;
// int cursor_y = cursor % w.ws_col;
int command_x = ecmd_pos / w.ws_col;
int command_y = ecmd_pos % w.ws_col;
// int command_y = (command.size() + prompt_size) % w.ws_col;
positionCursor(command_y, LEFT);
positionCursor(command_x, UP);
positionCursor(cursor_x, DOWN);
positionCursor(cursor_y, RIGHT);
fflush(stdout);
}
void cleanup_handler(void *arg) { resetTerminalMode(); }
void exitShell() {
/*int32_t ret =*/ resetTerminalMode();
taos_cleanup();
exit(EXIT_SUCCESS);
}
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
tsem_post(&cancelSem); tsem_post(&cancelSem);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册