提交 655b1626 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

......@@ -18,6 +18,7 @@
#include <string.h>
#include <time.h>
#include "taos.h"
#include <stdlib.h>
static int running = 1;
static void msg_process(TAOS_RES* msg) {
......@@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) {
void* meta;
int32_t metaLen;
tmq_get_raw_meta(msg, &meta, &metaLen);
char* result = tmq_get_json_meta(msg);
if(result){
printf("meta result: %s\n", result);
free(result);
}
printf("meta, len is %d\n", metaLen);
return;
}
......@@ -119,6 +124,104 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
if (taos_errno(pRes) != 0) {
printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table st1");
if (taos_errno(pRes) != 0) {
printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
if (taos_errno(pRes) != 0) {
printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "alter table n1 drop column c1");
if (taos_errno(pRes) != 0) {
printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
......@@ -137,8 +240,8 @@ int32_t create_topic() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -199,7 +302,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
......
......@@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
......
......@@ -13,6 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_COMMAND_H
#define TDENGINE_COMMAND_H
#include "cmdnodes.h"
#include "tmsg.h"
#include "plannodes.h"
......@@ -27,4 +30,4 @@ int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
#endif
......@@ -843,19 +843,25 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
if (taos_num_fields(pRequest) == 0) {
// this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
// All data has returned to App already, no need to try again
if (pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// all data has returned to App already, no need to try again
if ((pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) && pResultInfo->completed) {
pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
// it is a local executed query, no need to do async fetch
if (pResultInfo->current < pResultInfo->numOfRows && pRequest->body.queryJob == 0) {
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return;
}
SSchedulerReq req = {
......@@ -869,14 +875,14 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
pResultInfo->current = pResultInfo->numOfRows;
pResultInfo->convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
}
......
......@@ -23,6 +23,7 @@
#include "tqueue.h"
#include "tref.h"
#include "ttimer.h"
#include "cJSON.h"
int32_t tmqAskEp(tmq_t* tmq, bool async);
......@@ -1683,7 +1684,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew;
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
......@@ -1848,6 +1850,406 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len)
return -1;
}
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
return string;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0};
sprintf(uid, "%"PRIi64, id);
cJSON* id_ = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id_);
cJSON* tableName = cJSON_CreateString(name);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
cJSON_AddItemToObject(json, "tableType", tableType);
// cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version);
cJSON* columns = cJSON_CreateArray();
for(int i = 0; i < schemaRow->nCols; i++){
cJSON* column = cJSON_CreateObject();
SSchema *s = schemaRow->pSchema + i;
cJSON* cname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(column, "name", cname);
cJSON* ctype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(column, "type", ctype);
if(s->type == TSDB_DATA_TYPE_BINARY){
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(column, "length", cbytes);
}
cJSON_AddItemToArray(columns, column);
}
cJSON_AddItemToObject(json, "columns", columns);
cJSON* tags = cJSON_CreateArray();
for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
cJSON* tag = cJSON_CreateObject();
SSchema *s = schemaTag->pSchema + i;
cJSON* tname = cJSON_CreateString(s->name);
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(s->type);
cJSON_AddItemToObject(tag, "type", ttype);
if(s->type == TSDB_DATA_TYPE_BINARY){
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes);
}else if (s->type == TSDB_DATA_TYPE_NCHAR){
int32_t length = (s->bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(tag, "length", cbytes);
}
cJSON_AddItemToArray(tags, tag);
}
cJSON_AddItemToObject(json, "tags", tags);
string = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
return string;
}
static char *processCreateStb(SMqMetaRsp *metaRsp){
SVCreateStbReq req = {0};
SDecoder coder;
char* string = NULL;
// decode and process req
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err;
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
tDecoderClear(&coder);
return string;
_err:
tDecoderClear(&coder);
return string;
}
static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
return string;
}
cJSON* type = cJSON_CreateString("create");
cJSON_AddItemToObject(json, "type", type);
char cid[32] = {0};
sprintf(cid, "%"PRIi64, id);
cJSON* cid_ = cJSON_CreateString(cid);
cJSON_AddItemToObject(json, "id", cid_);
cJSON* tableName = cJSON_CreateString(name);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString("child");
cJSON_AddItemToObject(json, "tableType", tableType);
char sid_[32] = {0};
sprintf(sid_, "%"PRIi64, sid);
cJSON* using = cJSON_CreateString(sid_);
cJSON_AddItemToObject(json, "using", using);
// cJSON* version = cJSON_CreateNumber(1);
// cJSON_AddItemToObject(json, "version", version);
cJSON* tags = cJSON_CreateArray();
if (tTagIsJson(pTag)) { // todo
char* pJson = parseTagDatatoJson(pTag);
cJSON* tag = cJSON_CreateObject();
cJSON* tname = cJSON_CreateString("unknown"); // todo
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
cJSON_AddItemToObject(tag, "type", ttype);
cJSON* tvalue = cJSON_CreateString(pJson);
cJSON_AddItemToObject(tag, "value", tvalue);
cJSON_AddItemToArray(tags, tag);
cJSON_AddItemToObject(json, "tags", tags);
string = cJSON_PrintUnformatted(json);
goto end;
}
SArray* pTagVals = NULL;
int32_t code = tTagToValArray(pTag, &pTagVals);
if (code) {
goto end;
}
for(int i = 0; i < taosArrayGetSize(pTagVals); i++){
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
cJSON* tag = cJSON_CreateObject();
// cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
cJSON* tname = cJSON_CreateString("unkonwn"); // todo
cJSON_AddItemToObject(tag, "name", tname);
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
cJSON_AddItemToObject(tag, "type", ttype);
char* buf = NULL;
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
dataConverToStr(buf, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
} else {
buf = taosMemoryCalloc(32, 1);
dataConverToStr(buf, pTagVal->type, &pTagVal->i64, tDataTypes[pTagVal->type].bytes, NULL);
}
cJSON* tvalue = cJSON_CreateString(buf);
taosMemoryFree(buf);
cJSON_AddItemToObject(tag, "value", tvalue);
cJSON_AddItemToArray(tags, tag);
}
cJSON_AddItemToObject(json, "tags", tags);
string = cJSON_PrintUnformatted(json);
end:
cJSON_Delete(json);
return string;
}
static char *processCreateTable(SMqMetaRsp *metaRsp){
SDecoder decoder = {0};
SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq;
char *string = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
goto _exit;
}
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE){
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
}else if(pCreateReq->type == TSDB_NORMAL_TABLE){
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
}
}
tDecoderClear(&decoder);
_exit:
tDecoderClear(&decoder);
return string;
}
static char *processAlterTable(SMqMetaRsp *metaRsp){
SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0};
char *string = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
cJSON* type = cJSON_CreateString("alter");
cJSON_AddItemToObject(json, "type", type);
// cJSON* uid = cJSON_CreateNumber(id);
// cJSON_AddItemToObject(json, "uid", uid);
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ? "child" : "normal");
cJSON_AddItemToObject(json, "tableType", tableType);
switch (vAlterTbReq.action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: {
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType);
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}
break;
}
case TSDB_ALTER_TABLE_DROP_COLUMN:{
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
cJSON_AddItemToObject(json, "colType", colType);
if(vAlterTbReq.type == TSDB_DATA_TYPE_BINARY){
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR){
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE)/TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length);
cJSON_AddItemToObject(json, "colLength", cbytes);
}
break;
}
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
cJSON_AddItemToObject(json, "colName", colName);
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
cJSON_AddItemToObject(json, "colNewName", colNewName);
break;
}
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL:{
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_TAG_VAL);
cJSON_AddItemToObject(json, "alterType", alterType);
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
cJSON_AddItemToObject(json, "colName", tagName);
cJSON* colValue = cJSON_CreateString("invalid, todo"); // todo
cJSON_AddItemToObject(json, "colValue", colValue);
cJSON* isNull = cJSON_CreateBool(vAlterTbReq.isNull);
cJSON_AddItemToObject(json, "colValueNull", isNull);
break;
}
default:
break;
}
string = cJSON_PrintUnformatted(json);
_exit:
tDecoderClear(&decoder);
return string;
}
static char *processDropSTable(SMqMetaRsp *metaRsp){
SDecoder decoder = {0};
SVDropStbReq req = {0};
char *string = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type);
char uid[32] = {0};
sprintf(uid, "%"PRIi64, req.suid);
cJSON* id = cJSON_CreateString(uid);
cJSON_AddItemToObject(json, "id", id);
cJSON* tableName = cJSON_CreateString(req.name);
cJSON_AddItemToObject(json, "tableName", tableName);
cJSON* tableType = cJSON_CreateString("super");
cJSON_AddItemToObject(json, "tableType", tableType);
string = cJSON_PrintUnformatted(json);
_exit:
tDecoderClear(&decoder);
return string;
}
static char *processDropTable(SMqMetaRsp *metaRsp){
SDecoder decoder = {0};
SVDropTbBatchReq req = {0};
char *string = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
tDecoderInit(&decoder, data, len);
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
cJSON* type = cJSON_CreateString("drop");
cJSON_AddItemToObject(json, "type", type);
// cJSON* uid = cJSON_CreateNumber(id);
// cJSON_AddItemToObject(json, "uid", uid);
// cJSON* tableType = cJSON_CreateString("normal");
// cJSON_AddItemToObject(json, "tableType", tableType);
cJSON* tableNameList = cJSON_CreateArray();
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
cJSON_AddItemToArray(tableNameList, tableName);
}
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json);
_exit:
tDecoderClear(&decoder);
return string;
}
char *tmq_get_json_meta(TAOS_RES *res){
if (!TD_RES_TMQ_META(res)) {
return NULL;
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
return processCreateStb(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
return processCreateStb(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
return processDropSTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
return processCreateTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
return processAlterTable(&pMetaRspObj->metaRsp);
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
return processDropTable(&pMetaRspObj->metaRsp);
}
return NULL;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
}
......
......@@ -169,7 +169,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_DROP_TTL_TABLE:
//if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
if (vnodeProcessDropTtlTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
break;
case TDMT_VND_CREATE_SMA: {
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
......
......@@ -565,7 +565,6 @@ static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** p
}
int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
int32_t numOfCols = LIST_LENGTH(pProjects);
blockDataEnsureCapacity(pBlock, 1);
int32_t index = 0;
......@@ -579,7 +578,6 @@ int32_t buildSelectResultDataBlock(SNodeList* pProjects, SSDataBlock* pBlock) {
}
pBlock->info.rows = 1;
return TSDB_CODE_SUCCESS;
}
......
......@@ -1067,6 +1067,10 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
}
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->igExpired = pWindowLogicNode->igExpired;
......
......@@ -265,7 +265,6 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
// tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
......@@ -276,7 +275,20 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
*(str + bufSize + 1) = '"';
n = bufSize + 2;
break;
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
// tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
}
*str = '"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
if (length <= 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
*(str + length + 1) = '"';
n = length + 2;
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
......@@ -298,7 +310,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
return TSDB_CODE_TSC_INVALID_VALUE;
}
*len = n;
if(len) *len = n;
return TSDB_CODE_SUCCESS;
}
......
......@@ -13,13 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
SSchedulerMgmt schMgmt = {
.jobRef = -1,
......
......@@ -314,7 +314,6 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to get the root page of the an existing btree
SPgno pgno;
SPage *pPage;
int ret;
......@@ -1993,6 +1992,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
const void *pTKey;
int tkLen;
tdbTrace("ttl moveto, pager:%p, ipage:%d", pPager, pBtc->iPage);
if (pBtc->iPage < 0) {
// move from a clear cursor
ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
......
......@@ -121,9 +121,10 @@ SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
hash = tdbCstringHash(fname);
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
tdbTrace("tdbttl getPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
}
tdbTrace("tdbttl getPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, fname);
return *ppPager;
}
......@@ -143,9 +144,12 @@ void tdbEnvAddPager(TDB *pDb, SPager *pPager) {
// add to hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pDb->pgrHash[hash % pDb->nPgrHash];
tdbTrace("tdbttl addPager1: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
pPager->pHashNext = *ppPager;
*ppPager = pPager;
tdbTrace("tdbttl addPager2: pager:%p, index:%d, name:%s", *ppPager, hash % pDb->nPgrHash, pPager->dbFileName);
// increase the counter
pDb->nPager++;
}
......
......@@ -230,6 +230,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
}
}
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
......@@ -285,6 +286,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
return -1;
}
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
// init page if need
if (!TDB_PAGE_INITIALIZED(pPage)) {
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
......@@ -363,10 +365,12 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
pgno = TDB_PAGE_PGNO(pPage);
tdbTrace("tdbttl init pager:%p, pgno:%d, loadPage:%d, size:%d", pPager, pgno, loadPage, pPager->dbOrigSize);
if (loadPage && pgno <= pPager->dbOrigSize) {
init = 1;
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * (pgno - 1));
tdbTrace("tdbttl pager:%p, pgno:%d, nRead:%ld", pPager, pgno, nRead);
if (nRead < pPage->pageSize) {
ASSERT(0);
return -1;
......
......@@ -33,9 +33,9 @@ class TDTestCase:
def init(self, conn, logSql):
self.testcasePath = os.path.split(__file__)[0]
self.testcaseFilename = os.path.split(__file__)[-1]
os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
# os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
tdSql.init(conn.cursor(), True)
def run(self):
# tdSql.prepare()
......
......@@ -27,8 +27,7 @@ python3 ./test.py -f 1-insert/table_comment.py
python3 ./test.py -f 1-insert/time_range_wise.py
python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py
#python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 2-query/between.py
python3 ./test.py -f 2-query/distinct.py
......@@ -118,7 +117,7 @@ python3 ./test.py -f 2-query/distribute_agg_avg.py
python3 ./test.py -f 2-query/distribute_agg_stddev.py
python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 2-query/irate.py
python3 ./test.py -f 2-query/and_or_for_byte.py
#python3 ./test.py -f 2-query/and_or_for_byte.py
python3 ./test.py -f 2-query/function_null.py
python3 ./test.py -f 2-query/queryQnode.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册