提交 0d52707f 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/master' into hotfix/TD-3606

......@@ -937,6 +937,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return ret;
}
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
if (TSDB_CODE_TSC_ACTION_IN_PROGRESS == code) {
return code;
......@@ -945,6 +949,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
} else {
sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
if (pCmd->curSql == NULL) {
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS);
......@@ -953,10 +961,6 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
*sqlstr = sql;
if (*sqlstr == NULL) {
code = TSDB_CODE_TSC_INVALID_SQL;
}
return code;
}
......
......@@ -212,7 +212,7 @@ float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0;
float tsUsedDataDirGB = 0;
float tsReservedTmpDirectorySpace = 1.0f;
float tsMinimalDataDirGB = 1.0f;
float tsMinimalDataDirGB = 2.0f;
int32_t tsTotalMemoryMB = 0;
uint32_t tsVersion = 0;
......
......@@ -507,11 +507,6 @@ static void resetAfterAnsiEscape(void) {
static int taosRandom()
{
struct timeval tv;
gettimeofday(&tv, NULL);
srand(tv.tv_usec);
return rand();
}
......@@ -4521,22 +4516,23 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
} else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) {
int rand_num = taosRandom() % 100;
int randTail;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime
+ superTblInfo->timeStampStep * k
- taosRandom() % superTblInfo->disorderRange;
retLen = generateRowData(
data,
d,
superTblInfo);
randTail = (superTblInfo->timeStampStep * k
+ (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %d\n", randTail);
} else {
randTail = superTblInfo->timeStampStep * k;
}
uint64_t d = startTime
+ randTail;
retLen = generateRowData(
data,
startTime + superTblInfo->timeStampStep * k,
d,
superTblInfo);
}
}
if (retLen > remainderBufLen) {
break;
......@@ -4551,20 +4547,21 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
int lenOfBinary = g_args.len_of_binary;
int rand_num = taosRandom() % 100;
int randTail;
if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRatio)) {
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k
- taosRandom() % g_args.disorderRange;
retLen = generateData(data, data_type,
ncols_per_record, d, lenOfBinary);
randTail = (DEFAULT_TIMESTAMP_STEP * k
+ (taosRandom() % g_args.disorderRange + 1)) * (-1);
debugPrint("rand data generated, back %d\n", randTail);
} else {
randTail = DEFAULT_TIMESTAMP_STEP * k;
}
retLen = generateData(data, data_type,
ncols_per_record,
startTime + DEFAULT_TIMESTAMP_STEP * k,
startTime + randTail,
lenOfBinary);
}
if (len > remainderBufLen)
break;
......@@ -5106,7 +5103,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % winfo->superTblInfo->disorderRange;
int64_t d = winfo->lastTs - (taosRandom() % winfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, winfo->superTblInfo);
} else {
generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
......
......@@ -124,7 +124,7 @@ bool greaterEqualOperator(SColumnFilterElem *pFilter, const char *minval, const
bool equalOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval);
......@@ -202,7 +202,7 @@ bool likeOperator(SColumnFilterElem *pFilter, const char *minval, const char *ma
bool notEqualOperator(SColumnFilterElem *pFilter, const char *minval, const char *maxval, int16_t type) {
SColumnFilterInfo *pFilterInfo = &pFilter->filterInfo;
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TIMESTAMP) {
int64_t minv = -1, maxv = -1;
GET_TYPED_DATA(minv, int64_t, type, minval);
GET_TYPED_DATA(maxv, int64_t, type, maxval);
......
......@@ -997,16 +997,23 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
code = syncSaveIntoBuffer(pPeer, pHead);
} else {
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
pHead->version);
code = -1;
}
}
if (code != 0) {
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
syncRestartConnection(pPeer);
}
}
......
......@@ -2861,12 +2861,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
}
}
int64_t elapsed = taosGetTimestampUs() - stime;
......
......@@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t flowctrlLevel;
......
......@@ -25,6 +25,7 @@
#include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 100000
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
extern void * tsDnodeTmr;
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
......@@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
}
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
return TSDB_CODE_VND_NO_DISKSPACE;
}
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
......@@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
}
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
if (queued > MAX_QUEUED_MSG_NUM) {
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
if (ms > 100) ms = 100;
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
taosMsleep(ms);
}
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS;
......@@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
SVnodeObj *pVnode = vparam;
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
pWrite->rpcMsg.ahandle, queued, queuedSize);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
......@@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
pVnode->flowctrlLevel <= 0)
return 0;
if (tsEnableFlowCtrl == 0) {
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
......
......@@ -28,7 +28,8 @@
int points = 5;
int numOfTables = 3;
int tablesProcessed = 0;
int tablesInsertProcessed = 0;
int tablesSelectProcessed = 0;
int64_t st, et;
typedef struct {
......@@ -134,6 +135,9 @@ int main(int argc, char *argv[])
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesInsertProcessed = 0;
tablesSelectProcessed = 0;
for (i = 0; i<numOfTables; ++i) {
// insert records in asynchronous API
sprintf(sql, "insert into %s values(%ld, 0)", tableList[i].name, 1546300800000 + i);
......@@ -143,10 +147,20 @@ int main(int argc, char *argv[])
printf("once insert finished, presse any key to query\n");
getchar();
while(1) {
if (tablesInsertProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
}
break;
}
printf("start to query...\n");
gettimeofday(&systemTime, NULL);
st = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
tablesProcessed = 0;
for (i = 0; i < numOfTables; ++i) {
// select records in asynchronous API
......@@ -157,14 +171,8 @@ int main(int argc, char *argv[])
printf("\nonce finished, press any key to exit\n");
getchar();
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
getchar();
while(1) {
if (tablesProcessed < numOfTables) {
if (tablesSelectProcessed < numOfTables) {
printf("wait for process finished\n");
sleep(1);
continue;
......@@ -173,6 +181,10 @@ int main(int argc, char *argv[])
break;
}
for (i = 0; i<numOfTables; ++i) {
printf("%s inserted:%d retrieved:%d\n", tableList[i].name, tableList[i].rowsInserted, tableList[i].rowsRetrieved);
}
taos_close(taos);
free(tableList);
......@@ -214,8 +226,8 @@ void taos_insert_call_back(void *param, TAOS_RES *tres, int code)
}
else {
printf("%d rows data are inserted into %s\n", points, pTable->name);
tablesProcessed++;
if (tablesProcessed >= numOfTables) {
tablesInsertProcessed++;
if (tablesInsertProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables);
......@@ -251,15 +263,17 @@ void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows)
//taos_free_result(tres);
printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name);
tablesProcessed++;
if (tablesProcessed >= numOfTables) {
tablesSelectProcessed++;
if (tablesSelectProcessed >= numOfTables) {
gettimeofday(&systemTime, NULL);
et = systemTime.tv_sec * 1000000 + systemTime.tv_usec;
printf("%lld mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables);
}
}
taos_free_result(tres);
}
}
void taos_select_call_back(void *param, TAOS_RES *tres, int code)
......@@ -276,6 +290,4 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
taos_cleanup();
exit(1);
}
taos_free_result(tres);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册