提交 35d220c8 编写于 作者: Y yihaoDeng

fix crash

...@@ -328,6 +328,7 @@ typedef struct { ...@@ -328,6 +328,7 @@ typedef struct {
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex* pColumnIndex; SColumnIndex* pColumnIndex;
TAOS_FIELD* final;
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
struct SLocalMerger *pLocalMerger; struct SLocalMerger *pLocalMerger;
} SSqlRes; } SSqlRes;
......
...@@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return ret; return ret;
} }
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true); code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) { if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code; return code;
...@@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} else { } else {
sql = sToken.z; sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false); code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
if (pCmd->curSql == NULL) { if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS); assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
...@@ -952,10 +960,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC ...@@ -952,10 +960,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} }
*sqlstr = sql; *sqlstr = sql;
if (*sqlstr == NULL) {
code = TSDB_CODE_TSC_INVALID_SQL;
}
return code; return code;
} }
......
...@@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { ...@@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
SNewVgroupInfo info = {0}; SNewVgroupInfo info = {0};
info.numOfEps = pVgroupMsg->numOfEps; info.numOfEps = pVgroupMsg->numOfEps;
info.vgId = pVgroupMsg->vgId; info.vgId = pVgroupMsg->vgId;
info.inUse = 0; info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
assert(info.numOfEps >= 1 && info.vgId >= 1);
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) { for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN); tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
info.ep[i].port = pVgroupMsg->epAddr[i].port; info.ep[i].port = pVgroupMsg->epAddr[i].port;
......
...@@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0}; ...@@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0};
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub); void tscSaveSubscriptionProgress(void* sub);
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) { static int32_t getWaitingTimeInterval(int32_t count) {
...@@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) { ...@@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) {
for (int32_t i = 0; i < s->numOfEps; i++) { for (int32_t i = 0; i < s->numOfEps; i++) {
s->port[i] = htons(s->port[i]); s->port[i] = htons(s->port[i]);
} }
} }
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) { bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false; return false;
...@@ -111,19 +113,22 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou ...@@ -111,19 +113,22 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou
} }
} }
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
SSqlCmd *pCmd = &pObj->cmd; SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
return; return;
} }
int32_t vgId = pTableMetaInfo->pTableMeta->vgId; int32_t vgId = -1;
if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) { if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) {
assert(vgId == 0); vgId = extractSTableQueryVgroupId(pTableMetaInfo);
return; } else {
vgId = pTableMetaInfo->pTableMeta->vgId;
} }
assert(vgId > 0);
SNewVgroupInfo vgroupInfo = {.vgId = -1}; SNewVgroupInfo vgroupInfo = {.vgId = -1};
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0); assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
...@@ -138,6 +143,33 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { ...@@ -138,6 +143,33 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps); tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo)); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
// Update the local cached epSet info cached by SqlObj
int32_t inUse = pSql->epSet.inUse;
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse);
}
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
assert(pTableMetaInfo != NULL);
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
int32_t vgId = -1;
if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
} else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
SVgroupTableInfo *pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
vgId = pTableIdList->vgInfo.vgId;
}
return vgId;
} }
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
...@@ -515,21 +547,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -515,21 +547,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t vgIndex = pTableMetaInfo->vgroupIndex; int32_t vgIndex = pTableMetaInfo->vgroupIndex;
int32_t vgId = -1;
if (pTableMetaInfo->pVgroupTables == NULL) { if (pTableMetaInfo->pVgroupTables == NULL) {
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList; SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId);
} else { } else {
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(vgIndex >= 0 && vgIndex < numOfVgroups); assert(vgIndex >= 0 && vgIndex < numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
vgId = pTableIdList->vgInfo.vgId;
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId);
} }
pRetrieveMsg->header.vgId = htonl(vgId);
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, vgId, vgIndex, pSql->res.qId);
} else { } else {
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
...@@ -1980,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { ...@@ -1980,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it (vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup); vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
} }
} }
...@@ -2132,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -2132,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
tscError("%p empty vgroup info", pSql); tscError("%p empty vgroup info", pSql);
} else { } else {
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) { for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
//just init, no need to lock // just init, no need to lock
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
pVgroups->vgId = htonl(vmsg->vgId); vmsg->vgId = htonl(vmsg->vgId);
pVgroups->numOfEps = vmsg->numOfEps; vmsg->numOfEps = vmsg->numOfEps;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
}
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
pVgroup->numOfEps = newVi.numOfEps;
pVgroup->vgId = newVi.vgId;
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
pVgroup->epAddr[k].port = newVi.ep[k].port;
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
}
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1); // check if current buffer contains the vgroup info.
// If not, add it
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port); (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn)); taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
} }
} }
} }
......
...@@ -405,6 +405,7 @@ int taos_affected_rows(TAOS_RES *tres) { ...@@ -405,6 +405,7 @@ int taos_affected_rows(TAOS_RES *tres) {
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0; if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
...@@ -419,7 +420,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -419,7 +420,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo; SFieldInfo *pFieldInfo = &pQueryInfo->fieldsInfo;
if (pFieldInfo->final == NULL) { if (pRes->final == NULL) {
TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD)); TAOS_FIELD* f = calloc(pFieldInfo->numOfOutput, sizeof(TAOS_FIELD));
int32_t j = 0; int32_t j = 0;
...@@ -439,10 +440,10 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -439,10 +440,10 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
} }
} }
pFieldInfo->final = f; pRes->final = f;
} }
return pFieldInfo->final; return pRes->final;
} }
static bool needToFetchNewBlock(SSqlObj* pSql) { static bool needToFetchNewBlock(SSqlObj* pSql) {
......
...@@ -429,6 +429,8 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -429,6 +429,8 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
tfree(pRes->pArithSup->data); tfree(pRes->pArithSup->data);
tfree(pRes->pArithSup); tfree(pRes->pArithSup);
} }
tfree(pRes->final);
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
...@@ -1176,7 +1178,6 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) { ...@@ -1176,7 +1178,6 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
} }
taosArrayDestroy(pFieldInfo->internalField); taosArrayDestroy(pFieldInfo->internalField);
tfree(pFieldInfo->final);
memset(pFieldInfo, 0, sizeof(SFieldInfo)); memset(pFieldInfo, 0, sizeof(SFieldInfo));
} }
......
...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting; ...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
extern char tsEmail[]; extern char tsEmail[];
extern char tsArbitrator[]; extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int32_t tsDnodeId;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
......
...@@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1; ...@@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1;
int8_t tsEnableTelemetryReporting = 1; int8_t tsEnableTelemetryReporting = 1;
int8_t tsArbOnline = 0; int8_t tsArbOnline = 0;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
// common // common
int32_t tsRpcTimer = 1000; int32_t tsRpcTimer = 1000;
...@@ -212,7 +213,7 @@ float tsAvailTmpDirectorySpace = 0; ...@@ -212,7 +213,7 @@ float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0; float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0; float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f; float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 1.0f; float tsMinimalDataDirGB = 2.0f;
int32_t tsTotalMemoryMB = 0; int32_t tsTotalMemoryMB = 0;
uint32_t tsVersion = 0; uint32_t tsVersion = 0;
......
...@@ -73,6 +73,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row); ...@@ -73,6 +73,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1; int32_t cqObjRef = -1;
int32_t cqVnodeNum = 0;
void cqRmFromList(SCqObj *pObj) { void cqRmFromList(SCqObj *pObj) {
//LOCK in caller //LOCK in caller
...@@ -166,6 +167,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { ...@@ -166,6 +167,8 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
return NULL; return NULL;
} }
atomic_add_fetch_32(&cqVnodeNum, 1);
cqCreateRef(); cqCreateRef();
pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ"); pContext->tmrCtrl = taosTmrInit(0, 0, 0, "CQ");
...@@ -240,6 +243,13 @@ void cqClose(void *handle) { ...@@ -240,6 +243,13 @@ void cqClose(void *handle) {
if (hasCq == 0) { if (hasCq == 0) {
freeSCqContext(pContext); freeSCqContext(pContext);
} }
int32_t remainn = atomic_sub_fetch_32(&cqVnodeNum, 1);
if (remainn <= 0) {
int32_t ref = cqObjRef;
cqObjRef = -1;
taosCloseRef(ref);
}
} }
void cqStart(void *handle) { void cqStart(void *handle) {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
#include "tglobal.h"
static SDnodeCfg tsCfg = {0}; static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex; static pthread_mutex_t tsCfgMutex;
...@@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) { ...@@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) {
pthread_mutex_lock(&tsCfgMutex); pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId; tsCfg.dnodeId = cfg->dnodeId;
tsDnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg); dnodePrintCfg(cfg);
dnodeWriteCfg(); dnodeWriteCfg();
......
...@@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); ...@@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId); bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -163,6 +163,7 @@ int32_t* taosGetErrno(); ...@@ -163,6 +163,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist") #define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist")
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb") #define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x0364) //"Too many tags") #define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x0364) //"Too many tags")
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x0365) //"Too many columns")
#define TSDB_CODE_MND_TOO_MANY_TIMESERIES TAOS_DEF_ERROR_CODE(0, 0x0366) //"Too many time series") #define TSDB_CODE_MND_TOO_MANY_TIMESERIES TAOS_DEF_ERROR_CODE(0, 0x0366) //"Too many time series")
#define TSDB_CODE_MND_NOT_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x0367) //"Not super table") // operation only available for super table #define TSDB_CODE_MND_NOT_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x0367) //"Not super table") // operation only available for super table
#define TSDB_CODE_MND_COL_NAME_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x0368) //"Tag name too long") #define TSDB_CODE_MND_COL_NAME_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x0368) //"Tag name too long")
......
...@@ -2949,9 +2949,6 @@ static int readSampleFromCsvFileToMem( ...@@ -2949,9 +2949,6 @@ static int readSampleFromCsvFileToMem(
continue; continue;
} }
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", (long)readLen,
superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen); line, readLen);
getRows++; getRows++;
...@@ -3527,6 +3524,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3527,6 +3524,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
/*
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) { if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint; g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
...@@ -3536,6 +3534,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3536,6 +3534,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n"); printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
*/
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists if (childTblExists
...@@ -4619,7 +4618,8 @@ static int generateDataTail(char *tableName, int32_t tableSeq, ...@@ -4619,7 +4618,8 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
} }
static int generateSQLHead(char *tableName, int32_t tableSeq, static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer) threadInfo* pThreadInfo, SSuperTable* superTblInfo,
char *buffer, int remainderBufLen)
{ {
int len; int len;
if (superTblInfo) { if (superTblInfo) {
...@@ -4671,7 +4671,62 @@ static int generateSQLHead(char *tableName, int32_t tableSeq, ...@@ -4671,7 +4671,62 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return len; return len;
} }
static int generateProgressiveDataBuffer(char *pTblName, static int generateInterlaceDataBuffer(
char *tableName, int batchPerTbl, int i, int batchPerTblTimes,
int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startTime,
int *pRemainderBufLen)
{
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
*pRemainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int k = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
if (k > 0) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
pstr -= headLen;
pstr[0] = '\0';
}
return k;
}
static int generateProgressiveDataBuffer(
char *tableName,
int32_t tableSeq, int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
int64_t insertRows, int64_t insertRows,
...@@ -4691,6 +4746,7 @@ static int generateProgressiveDataBuffer(char *pTblName, ...@@ -4691,6 +4746,7 @@ static int generateProgressiveDataBuffer(char *pTblName,
assert(buffer != NULL); assert(buffer != NULL);
int k = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen; int remainderBufLen = maxSqlLen;
...@@ -4698,14 +4754,17 @@ static int generateProgressiveDataBuffer(char *pTblName, ...@@ -4698,14 +4754,17 @@ static int generateProgressiveDataBuffer(char *pTblName,
char *pstr = buffer; char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo, int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
buffer); buffer, remainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen; pstr += headLen;
remainderBufLen -= headLen; remainderBufLen -= headLen;
int k;
int dataLen; int dataLen;
k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo, k = generateDataTail(tableName, tableSeq, pThreadInfo, superTblInfo,
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom, g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom,
startTime, startTime,
pSamplePos, &dataLen); pSamplePos, &dataLen);
...@@ -4811,50 +4870,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4811,50 +4870,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL; return NULL;
} }
int headLen; int generated = generateInterlaceDataBuffer(
if (i == 0) { tableName, batchPerTbl, i, batchPerTblTimes,
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, tableSeq,
superTblInfo, pstr); pThreadInfo, pstr,
} else { insertRows,
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name,
tableName);
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
remainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int generated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, remainderBufLen, insertRows, 0,
startTime, startTime,
&(pThreadInfo->samplePos), &dataLen); &remainderBufLen);
if (generated < 0) { if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %d\n", debugPrint("[%d] %s() LN%d, generated data is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace; goto free_and_statistics_interlace;
} else if (generated == 0) {
break;
} }
pstr += dataLen;
remainderBufLen -= dataLen;
tableSeq ++;
recOfBatch += batchPerTbl; recOfBatch += batchPerTbl;
// startTime += batchPerTbl * superTblInfo->timeStampStep; // startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl; pThreadInfo->totalInsertRows += batchPerTbl;
...@@ -4862,7 +4894,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -4862,7 +4894,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch); batchPerTbl, recOfBatch);
tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) { if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table // turn to first table
......
...@@ -437,14 +437,14 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) { ...@@ -437,14 +437,14 @@ static int32_t mnodeCheckClusterCfgPara(const SClusterCfg *clusterCfg) {
return TAOS_DN_OFF_TIME_ZONE_NOT_MATCH; return TAOS_DN_OFF_TIME_ZONE_NOT_MATCH;
} }
if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) { // if (0 != strncasecmp(clusterCfg->locale, tsLocale, strlen(tsLocale))) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale); // mError("\"locale\"[%s - %s] cfg parameters inconsistent", clusterCfg->locale, tsLocale);
return TAOS_DN_OFF_LOCALE_NOT_MATCH; // return TAOS_DN_OFF_LOCALE_NOT_MATCH;
} // }
if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) { // if (0 != strncasecmp(clusterCfg->charset, tsCharset, strlen(tsCharset))) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset); // mError("\"charset\"[%s - %s] cfg parameters inconsistent.", clusterCfg->charset, tsCharset);
return TAOS_DN_OFF_CHARSET_NOT_MATCH; // return TAOS_DN_OFF_CHARSET_NOT_MATCH;
} // }
if (clusterCfg->enableBalance != tsEnableBalance) { if (clusterCfg->enableBalance != tsEnableBalance) {
mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, tsEnableBalance); mError("\"balance\"[%d - %d] cfg parameters inconsistent", clusterCfg->enableBalance, tsEnableBalance);
......
...@@ -1037,6 +1037,19 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1037,6 +1037,19 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg)); SCreateTableMsg* pCreate = (SCreateTableMsg*)((char*)pCreate1 + sizeof(SCMCreateTableMsg));
int16_t numOfTags = htons(pCreate->numOfTags);
if (numOfTags > TSDB_MAX_TAGS) {
mError("msg:%p, app:%p table:%s, failed to create, too many tags", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_TAGS;
}
int16_t numOfColumns = htons(pCreate->numOfColumns);
int32_t numOfCols = numOfColumns + numOfTags;
if (numOfCols > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
}
SSTableObj * pStable = calloc(1, sizeof(SSTableObj)); SSTableObj * pStable = calloc(1, sizeof(SSTableObj));
if (pStable == NULL) { if (pStable == NULL) {
mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName); mError("msg:%p, app:%p table:%s, failed to create, no enough memory", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
...@@ -1050,10 +1063,9 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1050,10 +1063,9 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
pStable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); pStable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
pStable->sversion = 0; pStable->sversion = 0;
pStable->tversion = 0; pStable->tversion = 0;
pStable->numOfColumns = htons(pCreate->numOfColumns); pStable->numOfColumns = numOfColumns;
pStable->numOfTags = htons(pCreate->numOfTags); pStable->numOfTags = numOfTags;
int32_t numOfCols = pStable->numOfColumns + pStable->numOfTags;
int32_t schemaSize = numOfCols * sizeof(SSchema); int32_t schemaSize = numOfCols * sizeof(SSchema);
pStable->schema = (SSchema *)calloc(1, schemaSize); pStable->schema = (SSchema *)calloc(1, schemaSize);
if (pStable->schema == NULL) { if (pStable->schema == NULL) {
...@@ -1064,11 +1076,6 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -1064,11 +1076,6 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); memcpy(pStable->schema, pCreate->schema, numOfCols * sizeof(SSchema));
if (pStable->numOfColumns > TSDB_MAX_COLUMNS || pStable->numOfTags > TSDB_MAX_TAGS) {
mError("msg:%p, app:%p table:%s, failed to create, too many columns", pMsg, pMsg->rpcMsg.ahandle, pCreate->tableName);
return TSDB_CODE_MND_INVALID_TABLE_NAME;
}
pStable->nextColId = 0; pStable->nextColId = 0;
for (int32_t col = 0; col < numOfCols; col++) { for (int32_t col = 0; col < numOfCols; col++) {
...@@ -1340,6 +1347,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1340,6 +1347,11 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_MND_APP_ERROR;
} }
if (pStable->numOfColumns + ncols + pStable->numOfTags > TSDB_MAX_COLUMNS) {
mError("msg:%p, app:%p stable:%s, add column, too many columns", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
return TSDB_CODE_MND_TOO_MANY_COLUMNS;
}
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) { if (mnodeFindSuperTableColumnIndex(pStable, schema[i].name) > 0) {
mError("msg:%p, app:%p stable:%s, add column, column:%s already exist", pMsg, pMsg->rpcMsg.ahandle, mError("msg:%p, app:%p stable:%s, add column, column:%s already exist", pMsg, pMsg->rpcMsg.ahandle,
......
...@@ -994,6 +994,7 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) { ...@@ -994,6 +994,7 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
pVgroup->dbName); pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue;
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i, mDebug("vgId:%d, index:%d, send sync vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp); pVgroup->vnodeGid[i].pDnode->dnodeEp);
......
...@@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() { ...@@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2; return tsShellActivityTimer * 2;
} }
int64_t genQueryId(void) {
int64_t uid = 0;
int64_t did = tsDnodeId;
uid = did << 54;
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
uid |= pid << 44;
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
uid |= ts << 11;
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
uid |= sid;
return uid;
}
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') { if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
...@@ -6445,6 +6469,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6445,6 +6469,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
goto _cleanup_qinfo; goto _cleanup_qinfo;
} }
pQInfo->qId = *qId;
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
SQuery* pQuery = &pQInfo->query; SQuery* pQuery = &pQInfo->query;
...@@ -6581,8 +6607,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6581,8 +6607,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor // todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo; return pQInfo;
......
...@@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const ...@@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const
bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) { bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo; SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1; int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval); GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval); GET_TYPED_DATA(maxv, int64_t, type, maxval);
...@@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma ...@@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma
bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) { bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo; SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1; int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval); GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval); GET_TYPED_DATA(maxv, int64_t, type, maxval);
......
...@@ -287,6 +287,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) { ...@@ -287,6 +287,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) {
tdListPrependNode(pList, pi->pn); tdListPrependNode(pList, pi->pn);
} }
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2 + sizeof(tFilePage);
}
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
pResultBuf->statis.getPages += 1; pResultBuf->statis.getPages += 1;
...@@ -311,7 +315,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 ...@@ -311,7 +315,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
// allocate buf // allocate buf
if (availablePage == NULL) { if (availablePage == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES + 2); // add extract bytes in case of zipped buffer increased. pi->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
} else { } else {
pi->pData = availablePage; pi->pData = availablePage;
} }
...@@ -355,7 +359,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { ...@@ -355,7 +359,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
} }
if (availablePage == NULL) { if (availablePage == NULL) {
(*pi)->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES); (*pi)->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize));
} else { } else {
(*pi)->pData = availablePage; (*pi)->pData = availablePage;
} }
......
...@@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code; return code;
} }
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo); assert(pQInfo && pQInfo->signature == pQInfo);
......
...@@ -1367,8 +1367,7 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1367,8 +1367,7 @@ static void rpcProcessConnError(void *param, void *id) {
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle); tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
if (pContext->numOfTry >= pContext->epSet.numOfEps if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
|| pContext->msgType == TSDB_MSG_TYPE_FETCH) {
rpcMsg.msgType = pContext->msgType+1; rpcMsg.msgType = pContext->msgType+1;
rpcMsg.ahandle = pContext->ahandle; rpcMsg.ahandle = pContext->ahandle;
rpcMsg.code = pContext->code; rpcMsg.code = pContext->code;
......
...@@ -409,23 +409,22 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) ...@@ -409,23 +409,22 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force)
syncReleaseNode(pNode); syncReleaseNode(pNode);
} }
#if 1
void syncRecover(int64_t rid) { void syncRecover(int64_t rid) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
SSyncNode *pNode = syncAcquireNode(rid); SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return; if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole); (*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
nodeVersion = 0;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
nodeVersion = 0;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -436,7 +435,6 @@ void syncRecover(int64_t rid) { ...@@ -436,7 +435,6 @@ void syncRecover(int64_t rid) {
syncReleaseNode(pNode); syncReleaseNode(pNode);
} }
#endif
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) { int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = syncAcquireNode(rid); SSyncNode *pNode = syncAcquireNode(rid);
...@@ -1000,17 +998,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -1000,17 +998,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL); code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else { } else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); code = syncSaveIntoBuffer(pPeer, pHead);
} else { } else {
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus], sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
pHead->version); pHead->version);
code = -1;
} }
} }
if (code != 0) {
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
syncRestartConnection(pPeer);
}
} }
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) { static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
......
...@@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows; pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
} }
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
} }
int64_t elapsed = taosGetTimestampUs() - stime; int64_t elapsed = taosGetTimestampUs() - stime;
......
...@@ -175,6 +175,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long") ...@@ -175,6 +175,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, "Too many time series") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, "Too many time series")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, "Not super table") // operation only available for super table TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, "Not super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, "Tag name too long") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, "Tag name too long")
......
...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag; ...@@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int64_t queuedWMsgSize;
int32_t queuedWMsg; int32_t queuedWMsg;
int32_t queuedRMsg; int32_t queuedRMsg;
int32_t flowctrlLevel; int32_t flowctrlLevel;
......
...@@ -99,8 +99,13 @@ int32_t vnodeSync(int32_t vgId) { ...@@ -99,8 +99,13 @@ int32_t vnodeSync(int32_t vgId) {
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
} }
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role == TAOS_SYNC_ROLE_SLAVE) {
vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vInfo("vgId:%d, vnode will sync, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
pVnode->version = 0;
pVnode->fversion = 0;
walResetVersion(pVnode->wal, pVnode->fversion);
syncRecover(pVnode->sync); syncRecover(pVnode->sync);
} }
......
...@@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { ...@@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
void * pCont = pRead->pCont; void * pCont = pRead->pCont;
int32_t contLen = pRead->contLen; int32_t contLen = pRead->contLen;
...@@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
uint64_t qId = 0; uint64_t qId = genQueryId();
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "vnodeStatus.h" #include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 100000 #define MAX_QUEUED_MSG_NUM 100000
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
extern void * tsDnodeTmr; extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
...@@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { ...@@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
} }
} }
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return TSDB_CODE_VND_NO_DISKSPACE;
}
if (!vnodeInReadyOrUpdatingStatus(pVnode)) { if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId, vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
vnodeStatus[pVnode->status], pVnode->refCount, pVnode); vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
...@@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) { ...@@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
} }
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) { int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3; int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
if (ms > 100) ms = 100; if (ms > 100) ms = 100;
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms); vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
taosMsleep(ms); taosMsleep(ms);
} }
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg); vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { ...@@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1); int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued); int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
pWrite->rpcMsg.ahandle, queued, queuedSize);
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { ...@@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode; SVnodeObj *pVnode = pWrite->pVnode;
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0; if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0; if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
pVnode->flowctrlLevel <= 0)
return 0;
if (tsEnableFlowCtrl == 0) { if (tsEnableFlowCtrl == 0) {
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2); int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
......
...@@ -28,7 +28,8 @@ ...@@ -28,7 +28,8 @@
int points = 5; int points = 5;
int numOfTables = 3; int numOfTables = 3;
int tablesProcessed = 0; int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et; int64_t st, et;
typedef struct { typedef struct {
...@@ -134,6 +135,9 @@ int main(int argc, char *argv[]) ...@@ -134,6 +135,9 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) { for (i = 0; i<numOfTables; ++i) {
// insert records in asynchronous API // insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i); sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
...@@ -143,10 +147,20 @@ int main(int argc, char *argv[]) ...@@ -143,10 +147,20 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n"); printf("once insert finished, presse any key to query\n");
getchar(); getchar();
while(1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
printf("start to query...\n"); printf("start to query...\n");
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesProcessed = 0;
for (i = 0; i < numOfTables; ++i) { for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API // select records in asynchronous API
...@@ -157,14 +171,8 @@ int main(int argc, char *argv[]) ...@@ -157,14 +171,8 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n"); printf("\nonce finished, press any key to exit\n");
getchar(); getchar();
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
getchar();
while(1) { while(1) {
if (tablesProcessed < numOfTables) { if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n"); printf("wait for process finished\n");
sleep(1); sleep(1);
continue; continue;
...@@ -173,6 +181,10 @@ int main(int argc, char *argv[]) ...@@ -173,6 +181,10 @@ int main(int argc, char *argv[])
break; break;
} }
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
taos_close(taos); taos_close(taos);
free(tableList); free(tableList);
...@@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code) ...@@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
} }
else { else {
printf("%d rows data are inserted into %s\n", points, pTable->name); printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesProcessed++; tablesInsertProcessed++;
if (tablesProcessed >= numOfTables) { if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables); printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
...@@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) ...@@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
//taos_free_result(tres); //taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name); printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesProcessed++; tablesSelectProcessed++;
if (tablesProcessed >= numOfTables) { if (tablesSelectProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL); gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables); printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
} }
taos_free_result(tres);
} }
taos_free_result(tres);
} }
void taos_select_call_back(void *param, TAOS_RES *tres, int code) void taos_select_call_back(void *param, TAOS_RES *tres, int code)
...@@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) ...@@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
taos_cleanup(); taos_cleanup();
exit(1); exit(1);
} }
taos_free_result(tres);
} }
...@@ -258,6 +258,8 @@ python3 test.py -f subscribe/singlemeter.py ...@@ -258,6 +258,8 @@ python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py #python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py python3 test.py -f subscribe/supertable.py
# topic
python3 ./test.py -f topic/topicQuery.py
#======================p3-end=============== #======================p3-end===============
#======================p4-start=============== #======================p4-start===============
......
...@@ -43,6 +43,9 @@ class TDTestCase: ...@@ -43,6 +43,9 @@ class TDTestCase:
tdSql.query("select * from tb") tdSql.query("select * from tb")
tdSql.checkRows(insertRows + 4) tdSql.checkRows(insertRows + 4)
# test case for https://jira.taosdata.com:18080/browse/TD-3716:
tdSql.error("insert into tb(now, 1)")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
def run(self):
tdSql.prepare()
# test case for https://jira.taosdata.com:18080/browse/TD-3679
print("==============step1")
tdSql.execute(
"create topic tq_test partitions 10")
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
print("==============step2")
tdSql.query("select * from tq_test.p1")
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.execute("use db")
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
tdSql.query("select * from test")
tdSql.checkRows(4)
tdSql.query("select * from test where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.query("select * from test where start >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where start > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where start = %d" % self.ts)
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册