提交 578ef945 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/sim

...@@ -89,12 +89,17 @@ int taosInitTimer(void (*callback)(int), int ms) { ...@@ -89,12 +89,17 @@ int taosInitTimer(void (*callback)(int), int ms) {
if (code != 0) { if (code != 0) {
uError("failed to create timer thread"); uError("failed to create timer thread");
return -1; return -1;
} else {
uDebug("timer thread:0x%08" PRIx64 " is created", taosGetPthreadId(timerThread));
} }
return 0; return 0;
} }
void taosUninitTimer() { void taosUninitTimer() {
stopTimer = true; stopTimer = true;
uDebug("join timer thread:0x%08" PRIx64, taosGetPthreadId(timerThread));
pthread_join(timerThread, NULL); pthread_join(timerThread, NULL);
} }
......
...@@ -1667,7 +1667,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS ...@@ -1667,7 +1667,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
_end: _end:
assert(offset >= 0 && tsCols != NULL); assert(offset >= 0 && tsCols != NULL);
if (prevTs != INT64_MIN) { if (prevTs != INT64_MIN && prevTs != *(int64_t*)pRuntimeEnv->prevRow[0]) {
assert(prevRowIndex >= 0); assert(prevRowIndex >= 0);
item->lastKey = prevTs + step; item->lastKey = prevTs + step;
} }
...@@ -7728,4 +7728,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) { ...@@ -7728,4 +7728,4 @@ void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle) {
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0; return 0;
} }
\ No newline at end of file
...@@ -90,7 +90,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -90,7 +90,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
break; break;
} }
sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, sDebug("%s, file:%s info is received from master, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id,
minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic); minfo.name, minfo.index, minfo.size, minfo.fversion, minfo.magic);
// remove extra files on slave between the current and last index // remove extra files on slave between the current and last index
...@@ -100,13 +100,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -100,13 +100,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info // check the file info
sinfo = minfo; sinfo = minfo;
sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion); sinfo.magic = (*pNode->getFileInfo)(pNode->vgId, sinfo.name, &sinfo.index, TAOS_SYNC_MAX_INDEX, &sinfo.size, &sinfo.fversion);
sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, sinfo.name, sDebug("%s, local file:%s info, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, sinfo.name,
sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic); sinfo.index, sinfo.size, sinfo.fversion, sinfo.magic);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(SFileAck)); memset(&fileAck, 0, sizeof(SFileAck));
syncBuildFileAck(&fileAck, pNode->vgId); syncBuildFileAck(&fileAck, pNode->vgId);
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; fileAck.sync = (sinfo.magic != minfo.magic || sinfo.size != minfo.size || sinfo.name[0] == 0) ? 1 : 0;
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck)); ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
......
...@@ -104,7 +104,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -104,7 +104,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId); syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%d", pPeer->id, fileInfo.name, sDebug("%s, file:%s info is sent, index:%d size:%" PRId64 " fver:%" PRIu64 " magic:%u", pPeer->id, fileInfo.name,
fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic); fileInfo.index, fileInfo.size, fileInfo.fversion, fileInfo.magic);
// send the file info // send the file info
...@@ -143,10 +143,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -143,10 +143,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// if sync is not required, continue // if sync is not required, continue
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
fileInfo.index++; fileInfo.index++;
sDebug("%s, %s is the same", pPeer->id, fileInfo.name); sDebug("%s, %s is the same, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
continue; continue;
} else { } else {
sDebug("%s, %s will be sent", pPeer->id, fileInfo.name); sDebug("%s, %s will be sent, fver:%" PRIu64, pPeer->id, fileInfo.name, fileInfo.fversion);
} }
// get the full path to file // get the full path to file
...@@ -328,7 +328,8 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) ...@@ -328,7 +328,8 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again // if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once = 1; once = 1;
offset += bytes; offset += bytes;
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset); sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64 " sver:%" PRIu64 " fver:%" PRIu64, pPeer->id,
bytes, offset, pPeer->sversion, fversion);
} }
return -1; return -1;
......
...@@ -90,7 +90,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -90,7 +90,10 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) {
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
return code;
}
pVnode->version = pHead->version; pVnode->version = pHead->version;
......
...@@ -292,6 +292,9 @@ bool simExecuteRunBackCmd(SScript *script, char *option) { ...@@ -292,6 +292,9 @@ bool simExecuteRunBackCmd(SScript *script, char *option) {
if (pthread_create(&newScript->bgPid, NULL, simExecuteScript, (void *)newScript) != 0) { if (pthread_create(&newScript->bgPid, NULL, simExecuteScript, (void *)newScript) != 0) {
sprintf(script->error, "lineNum:%d. create background thread failed", script->lines[script->linePos].lineNum); sprintf(script->error, "lineNum:%d. create background thread failed", script->lines[script->linePos].lineNum);
return false; return false;
} else {
simDebug("script:%s, background thread:0x%08" PRIx64 " is created", newScript->fileName,
taosGetPthreadId(newScript->bgPid));
} }
script->linePos++; script->linePos++;
...@@ -448,7 +451,6 @@ void simCloseNativeConnect(SScript *script) { ...@@ -448,7 +451,6 @@ void simCloseNativeConnect(SScript *script) {
simDebug("script:%s, taos:%p closed", script->fileName, script->taos); simDebug("script:%s, taos:%p closed", script->fileName, script->taos);
taos_close(script->taos); taos_close(script->taos);
taosMsleep(1200);
script->taos = NULL; script->taos = NULL;
} }
......
...@@ -40,14 +40,14 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -40,14 +40,14 @@ int32_t main(int32_t argc, char *argv[]) {
printf("usage: %s [options] \n", argv[0]); printf("usage: %s [options] \n", argv[0]);
printf(" [-c config]: config directory, default is: %s\n", configDir); printf(" [-c config]: config directory, default is: %s\n", configDir);
printf(" [-f script]: script filename\n"); printf(" [-f script]: script filename\n");
exit(0); return 0;
} }
} }
if (!simSystemInit()) { if (!simSystemInit()) {
simError("failed to initialize the system"); simError("failed to initialize the system");
simSystemCleanUp(); simSystemCleanUp();
exit(1); return -1;
} }
simInfo("simulator is running ..."); simInfo("simulator is running ...");
...@@ -56,7 +56,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -56,7 +56,7 @@ int32_t main(int32_t argc, char *argv[]) {
SScript *script = simParseScript(scriptFile); SScript *script = simParseScript(scriptFile);
if (script == NULL) { if (script == NULL) {
simError("parse script file:%s failed", scriptFile); simError("parse script file:%s failed", scriptFile);
exit(-1); return -1;
} }
simScriptList[++simScriptPos] = script; simScriptList[++simScriptPos] = script;
......
...@@ -93,27 +93,34 @@ void simFreeScript(SScript *script) { ...@@ -93,27 +93,34 @@ void simFreeScript(SScript *script) {
for (int32_t i = 0; i < script->bgScriptLen; ++i) { for (int32_t i = 0; i < script->bgScriptLen; ++i) {
SScript *bgScript = script->bgScripts[i]; SScript *bgScript = script->bgScripts[i];
simInfo("script:%s, set stop flag", script->fileName); simDebug("script:%s, is background script, set stop flag", bgScript->fileName);
bgScript->killed = true; bgScript->killed = true;
if (taosCheckPthreadValid(bgScript->bgPid)) { if (taosCheckPthreadValid(bgScript->bgPid)) {
pthread_join(bgScript->bgPid, NULL); pthread_join(bgScript->bgPid, NULL);
} }
simDebug("script:%s, background thread joined", bgScript->fileName);
taos_close(bgScript->taos);
tfree(bgScript->lines);
tfree(bgScript->optionBuffer);
tfree(bgScript);
} }
}
simDebug("script:%s, is freed", script->fileName); simDebug("script:%s, is cleaned", script->fileName);
taos_close(script->taos); taos_close(script->taos);
tfree(script->lines); tfree(script->lines);
tfree(script->optionBuffer); tfree(script->optionBuffer);
tfree(script); tfree(script);
}
} }
SScript *simProcessCallOver(SScript *script) { SScript *simProcessCallOver(SScript *script) {
if (script->type == SIM_SCRIPT_TYPE_MAIN) { if (script->type == SIM_SCRIPT_TYPE_MAIN) {
simDebug("script:%s, is main script, set stop flag", script->fileName);
if (script->killed) { if (script->killed) {
simInfo("script:" FAILED_PREFIX "%s" FAILED_POSTFIX ", " FAILED_PREFIX "failed" FAILED_POSTFIX ", error:%s", simInfo("script:" FAILED_PREFIX "%s" FAILED_POSTFIX ", " FAILED_PREFIX "failed" FAILED_POSTFIX ", error:%s",
script->fileName, script->error); script->fileName, script->error);
exit(-1); return NULL;
} else { } else {
simInfo("script:" SUCCESS_PREFIX "%s" SUCCESS_POSTFIX ", " SUCCESS_PREFIX "success" SUCCESS_POSTFIX, simInfo("script:" SUCCESS_PREFIX "%s" SUCCESS_POSTFIX ", " SUCCESS_PREFIX "success" SUCCESS_POSTFIX,
script->fileName); script->fileName);
...@@ -125,13 +132,13 @@ SScript *simProcessCallOver(SScript *script) { ...@@ -125,13 +132,13 @@ SScript *simProcessCallOver(SScript *script) {
if (simScriptPos == -1) { if (simScriptPos == -1) {
simInfo("----------------------------------------------------------------------"); simInfo("----------------------------------------------------------------------");
simInfo("Simulation Test Done, " SUCCESS_PREFIX "%d" SUCCESS_POSTFIX " Passed:\n", simScriptSucced); simInfo("Simulation Test Done, " SUCCESS_PREFIX "%d" SUCCESS_POSTFIX " Passed:\n", simScriptSucced);
exit(0); return NULL;
} }
return simScriptList[simScriptPos]; return simScriptList[simScriptPos];
} }
} else { } else {
simInfo("script:%s, is stopped by main script", script->fileName); simDebug("script:%s, is stopped", script->fileName);
simFreeScript(script); simFreeScript(script);
return NULL; return NULL;
} }
...@@ -161,5 +168,6 @@ void *simExecuteScript(void *inputScript) { ...@@ -161,5 +168,6 @@ void *simExecuteScript(void *inputScript) {
} }
} }
simInfo("thread is stopped");
return NULL; return NULL;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册