From 23a1ab78a4d47a7ab556732144301d3273b96fdf Mon Sep 17 00:00:00 2001 From: sushuang Date: Thu, 21 Dec 2017 11:23:29 +0800 Subject: [PATCH] Fix stream tasks. --- src/chart/lines/linesLayout.js | 2 +- src/echarts.js | 1 + src/model/Series.js | 34 ++++------ src/stream/Scheduler.js | 19 +++--- src/stream/task.js | 112 ++++++++++++++++----------------- test/lines-stream-ny.html | 3 +- 6 files changed, 76 insertions(+), 95 deletions(-) diff --git a/src/chart/lines/linesLayout.js b/src/chart/lines/linesLayout.js index 1c2688bbd..d59deed9f 100644 --- a/src/chart/lines/linesLayout.js +++ b/src/chart/lines/linesLayout.js @@ -29,7 +29,7 @@ export default { points[offset++] = len; } for (var k = 0; k < len; k++) { - coordSys.dataToPoint(lineCoords[k], pt); + pt = coordSys.dataToPoint(lineCoords[k], pt); points[offset++] = pt[0]; points[offset++] = pt[1]; } diff --git a/src/echarts.js b/src/echarts.js index 66806c63a..cb14920f8 100644 --- a/src/echarts.js +++ b/src/echarts.js @@ -1191,6 +1191,7 @@ echartsProto.addData = function (params) { } seriesModel.appendData(params); + seriesModel.restoreData(true); this._scheduler.unfinished = true; }; diff --git a/src/model/Series.js b/src/model/Series.js index 22546cd83..67524b36d 100644 --- a/src/model/Series.js +++ b/src/model/Series.js @@ -66,20 +66,14 @@ var SeriesModel = ComponentModel.extend({ // this.settingTask = createTask(); - this.dataInitTask = createTask({ - reset: dataInitTaskReset, - count: dataInitTaskCount, - progress: dataInitTaskProgress - }, {model: this}); - - this.dataRestoreTask = createTask({ - reset: dataRestoreTaskReset + this.initTask = createTask({ + reset: initTaskReset, + count: initTaskCount }, {model: this}); this.mergeDefaultAndTheme(option, ecModel); var data = this.getInitialData(option, ecModel); - this.dataInitTask.dirty(); if (__DEV__) { zrUtil.assert(data, 'getInitialData returned invalid data.'); @@ -148,7 +142,7 @@ var SeriesModel = ComponentModel.extend({ var data = this.getInitialData(newSeriesOption, ecModel); // ??? set dirty on ecModel, becusue it will call mergeOption({})? - this.dataInitTask.dirty(); + this.initTask.dirty(); inner(this).dataBeforeProcessed = data; }, @@ -177,7 +171,8 @@ var SeriesModel = ComponentModel.extend({ * Append data to list */ appendData: function (parmas) { - this.getRawData().appendData(parmas.data); + var data = this.getRawData(); + data.appendData(parmas.data); }, /** @@ -336,8 +331,9 @@ var SeriesModel = ComponentModel.extend({ return animationEnabled; }, - restoreData: function () { - this.dataRestoreTask.dirty(); + restoreData: function (notDirty) { + this.setData(this.getRawData().cloneShallow()); + !notDirty && this.initTask.dirty(); }, getColorFromPalette: function (name, scope) { @@ -384,19 +380,11 @@ var SeriesModel = ComponentModel.extend({ zrUtil.mixin(SeriesModel, modelUtil.dataFormatMixin); zrUtil.mixin(SeriesModel, colorPaletteMixin); -function dataInitTaskCount(context) { +function initTaskCount(context) { return context.model.getRawData().getProvider().count(); } -function dataInitTaskReset(context) { - return {finished: true}; -} - -function dataInitTaskProgress(taskParams, context) {} - -function dataRestoreTaskReset(context) { - var model = context.model; - model.setData(model.getRawData().cloneShallow()); +function initTaskReset(context) { return {noProgress: true}; } diff --git a/src/stream/Scheduler.js b/src/stream/Scheduler.js index 70ea9628b..6dc12d837 100644 --- a/src/stream/Scheduler.js +++ b/src/stream/Scheduler.js @@ -59,23 +59,21 @@ proto.restorePipelines = function (ecModel) { var pipelines = scheduler._pipelineMap = createHashMap(); ecModel.eachSeries(function (seriesModel) { - var dataInitTask = seriesModel.dataInitTask; - var dataRestoreTask = seriesModel.dataRestoreTask; + var initTask = seriesModel.initTask; + var progressive = seriesModel.get('progressive'); pipelines.set(seriesModel.uid, { - head: dataInitTask, - tail: dataInitTask, + head: initTask, + tail: initTask, threshold: seriesModel.get('progressiveThreshold'), - incremental: seriesModel.get('progressive') + incremental: progressive && !(seriesModel.banIncremental && seriesModel.banIncremental()), bockIndex: -1, - step: seriesModel.get('progressive') || 700, // ??? Temporarily number + step: progressive || 700, // ??? Temporarily number count: 2 }); - dataInitTask.__block = dataRestoreTask.__block = true; - - pipe(scheduler, seriesModel, dataRestoreTask); + initTask.__pipelineId = seriesModel.uid; }); }; @@ -161,8 +159,7 @@ proto.performSeriesTasks = function (ecModel) { ecModel.eachSeries(function (seriesModel) { // Progress to the end for dataInit and dataRestore. - unfinished |= seriesModel.dataInitTask.perform(); - unfinished |= seriesModel.dataRestoreTask.perform(); + unfinished |= seriesModel.initTask.perform(); }); this.unfinished |= unfinished; diff --git a/src/stream/task.js b/src/stream/task.js index f69ef43cc..a9529499f 100644 --- a/src/stream/task.js +++ b/src/stream/task.js @@ -32,27 +32,19 @@ function Task(define, context) { var taskProto = Task.prototype; -taskProto.perform = function (performInfo) { - this.plan(); - progress(this, performInfo); - return this.unfinished(); -}; - -taskProto.dirty = function () { - this._dirty = true; - this.agentStubs && each(this.agentStubs, function (stub) { - stub._dirty = true; - }); - this.agent && (this.agent._dirty = true); -}; - -taskProto.plan = function () { - var finishedAfterReset; +/** + * @param {Object} performArgs + * @param {number} [performArgs.step] Specified step. + * @param {number} [performArgs.skip] Skip customer perform call. + */ +taskProto.perform = function (performArgs) { if (this._dirty) { this._dirty = false; - finishedAfterReset = reset(this); + reset(this); } + var step = performArgs && performArgs.step; + // This should always be performed so it can be passed to downstream. var upTask = this._upstream; if (upTask) { @@ -64,14 +56,53 @@ taskProto.plan = function () { } // If noProgress, pass index from upstream to downstream each time plan called. - if (finishedAfterReset || this._noProgress) { - this._dueIndex = this._outputDueEnd = this._dueEnd; + if (this._noProgress) { + this._dueIndex = this._outputDueEnd = this._dueEnd = count(this); + } + else { + if (__DEV__) { + assert(upTask || this._count); + } + + var start = this._dueIndex; + var end = Math.min( + step != null ? this._dueIndex + step : Infinity, + upTask ? this._dueEnd : Infinity, + this._count ? this._count(this.context) : Infinity + ); + + var outputDueEnd; + !(performArgs && performArgs.skip) && start < end && ( + outputDueEnd = this._progress({start: start, end: end}, this.context) + ); + + this._dueIndex = end; + // If no `outputDueEnd`, assume that output data and + // input data is the same, so use `dueIndex` as `outputDueEnd`. + if (outputDueEnd == null) { + outputDueEnd = end; + } + if (__DEV__) { + // ??? Can not rollback. + assert(outputDueEnd >= this._outputDueEnd); + } + this._outputDueEnd = outputDueEnd; } // Initialized in scheduler. each(this.agentStubs, function (agentStub) { - agentStub.plan(); + agentStub.perform(performArgs); }); + + return this.unfinished(); +}; + +taskProto.dirty = function () { + this._dirty = true; + this.agentStubs && each(this.agentStubs, function (stub) { + stub._dirty = true; + }); + this.agent && (this.agent._dirty = true); }; /** @@ -79,7 +110,7 @@ taskProto.plan = function () { */ function reset(taskIns) { taskIns._dueIndex = taskIns._outputDueEnd = 0; - taskIns._dueEnd = taskIns._count ? taskIns._count(taskIns.context) : 0; + taskIns._dueEnd = count(taskIns); var result = taskIns._reset && taskIns._reset(taskIns.context) || {}; @@ -88,45 +119,10 @@ function reset(taskIns) { taskIns._downstream && taskIns._downstream.dirty(); // FIXME taskIns.agent && taskIns.agent.dirty(); - - return result.finished; } -/** - * @param {Object} performInfo - * @param {number} [performInfo.step] Specified step. - * @param {number} [performInfo.skip] Skip customer perform call. - */ -function progress(taskIns, performInfo) { - var step = performInfo && performInfo.step; - - if (taskIns._noProgress) { - return; - } - - var start = taskIns._dueIndex; - var end = Math.min( - step != null ? start + step : Infinity, - taskIns._upstream ? taskIns._dueEnd : Infinity, - taskIns._count ? taskIns._count(taskIns.context) : Infinity - ); - - var outputDueEnd; - !(performInfo && performInfo.skip) && start < end && ( - outputDueEnd = taskIns._progress({start: start, end: end}, taskIns.context) - ); - - taskIns._dueIndex = end; - // If no `outputDueEnd`, assume that output data and - // input data is the same, so use `dueIndex` as `outputDueEnd`. - if (outputDueEnd == null) { - outputDueEnd = end; - } - if (__DEV__) { - // ??? Can not rollback. - assert(outputDueEnd >= taskIns._outputDueEnd); - } - taskIns._outputDueEnd = outputDueEnd; +function count(taskIns) { + return taskIns._count ? taskIns._count(taskIns.context) : 0; } taskProto.getProgressInfo = function () { diff --git a/test/lines-stream-ny.html b/test/lines-stream-ny.html index b4acda9ea..81d541a6a 100644 --- a/test/lines-stream-ny.html +++ b/test/lines-stream-ny.html @@ -38,7 +38,7 @@ var config = { dataLoading: 'whole', - progressive: true, + progressive: 40000, progressiveThreshold: 3000, largeModel: true }; @@ -91,7 +91,6 @@ chart.setOption({ - progressiveStep: 40000, animation: false, geo: { center: [-74.04327099998152, 40.86737600240287], -- GitLab