提交 83c6f213 编写于 作者: M Mark Needham

apoc.periodic.iterate

上级 2aeddb52
......@@ -26,6 +26,9 @@ apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config
|config|MAP?|null
|===
== Config parameters
include::partial$usage/config/apoc.periodic.iterate.adoc[]
== Output parameters
[.procedures, opts=header]
|===
......@@ -44,5 +47,9 @@ apoc.periodic.iterate(cypherIterate :: STRING?, cypherAction :: STRING?, config
|failedParams|MAP?
|===
[[usage-apoc.periodic.iterate]]
== Usage Examples
include::partial$usage/apoc.periodic.iterate.adoc[]
xref::graph-updates/periodic-execution.adoc#commit-batching[More documentation of apoc.periodic.iterate,role=more information]
Let's go through some examples.
If you were to add an `:Actor` label to several million `:Person` nodes, you could run the following code:
[source,cypher]
----
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
----
Let's break down the parameters passed to the procedure:
* Our first Cypher statement selects all the `Person` nodes with an `ACTED_IN` relationship to another node and returns those persons.
This is the data-driven portion where we select the data that we want to change.
* Our second Cypher statement sets the `:Actor` label on each of the `Person` nodes selected.
This is the operation portion where we apply the change to the data from our first statement.
* And finally, we specify any configuration we want the procedure to use.
We have defined a `batchSize` of 10,000 and to run the statements in parallel.
Executing this procedure would take all of our `Person` nodes gathered in the first Cypher statement and update each of them with the second Cypher statement.
It divides the work into batches - taking 10,000 `Person` nodes from the stream and updating them in a single transaction.
If we have 30,000 `Person` nodes in our graph with an `ACTED_IN` relationship, then it would break this down into 3 batches.
Finally, it runs those in parallel, as updating node labels or properties do not conflict.
[NOTE]
====
For more complex operations like updating or removing relationships, either *do not use parallel: true* OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects.
If you attempt complex operations, also enable retrying failed operations, e.g. with `retries:3`.
====
Now let us look at a more complex example.
[source,cypher]
----
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
"MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
----
Let's break down the parameters passed to the procedure:
* Our first Cypher statement selects all the `Order` nodes that have an order date greater than `October 13, 2016` (first Cypher statement).
* Our second Cypher statement takes those groups and finds the nodes that have a `HAS_ITEM` relationship to other nodes, then sums up the value of those items and sets that sum as a property (`o.value`) for the total order value.
* Our configuration will batch those nodes into groups of 100 (`batchSize:100`) and run the batches in parallel for the second statement to process.
=== Batch mode: BATCH_SINGLE
If our operation statement calls a procedure that takes in a batch of values, we can use `batchMode: "BATCH_SINGLE"` to get access to a batch of values to pass to that procedure.
When we use `BATCH_SINGLE`, the operation statement will have access to the `$_batch` parameter, which will contain a list of the fields returned in the data-driven statement.
For example, if the data driven statement is:
[source,cypher]
----
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
----
The contents of the `$_batch` variable passed to the operation statement would be:
[source,text]
----
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
----
Let's see an example of this in action.
We'll start by creating some nodes:
.The following query creates 100,000 nodes with the label `Person` and property `id`
[source,cypher]
----
UNWIND range(1,100000) as id create (:Person {id: id})
----
We can delete these nodes using the `apoc.nodes.delete` procedure.
See xref::graph-updates/data-deletion.adoc[].
This procedure takes in a list of nodes, which we can extract from the `$_batch` parameter.
.The following query streams all the `Person` nodes and deletes them in batches of 100
[source,cypher]
----
CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN p",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
----
The contents of the `$_batch` parameter that is used in the operation statement would be as follows:
[source,text]
----
[
{p: Node<1>},
{p: Node<2>},
...
]
----
We can use a https://neo4j.com/docs/cypher-manual/current/syntax/lists/#cypher-list-comprehension[list comprehension^] to extract the `p` variable from each item in the list.
If we run this query, we'll see the following output:
.Results
[options="header"]
|===
| batch | operations
| {total: 1000, committed: 1000, failed: 0, errors: {}} | {total: 100000, committed: 100000, failed: 0, errors: {}}
|===
\ No newline at end of file
The procedure support the following config parameters:
.Config parameters
[opts=header]
|===
| name | type | default | description
| batchSize | Long | 10000 | run the specified number of operation statements in a single tx - params: {_count, _batch}
| parallel | boolean | false | run operation statements in parallel (note that statements might deadlock if conflicting)
| retries | Long | 0 | if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param \{_retry}
| batchMode | String | "BATCH" a| how data-driven statements should be processed by operation statement. Valid values are:
* "BATCH" - execute operation statement once per batchSize. Operation statement is prefixed with the following, which extracts each field returned in the data-driven statement from the `$_batch` parameter:
[source,cypher]
----
UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
----
* "SINGLE" - execute operation statement one at a time
* "BATCH_SINGLE" - execute operation statement once per batchSize, but leaves unpacking of batch to the operation statement.
The operation query can access the batched values via the `$_batch` parameter.
| params | Map | {} | externally pass in map of params
| concurrency | Long | 50 | number of concurrent tasks are generated when using `parallel:true`
| failedParams | Long | -1 | if set to a non-negative value, each failed batch up to `failedParams` parameter sets are returned in `yield failedParams`.
|===
[NOTE]
====
In APOC versions 4.0.0.11 and earlier, the `iterateList` config key was used to control the batching of values returned by the data-driven statement.
This was replaced by `batchMode` in version 4.0.0.12.
These config keys are treated as follows:
* If `batchMode` is provided, its value takes precedence over `iterateList`
* If `batchMode` is not provided and `iterateList` is provided, the value of `iterateList` will be translated as described in the table below.
* If neither `batchMode` nor `iterateList` are provided, `batchMode` defaults to `BATCH`, which is the same as `iterateList:true`
====
.Deprecated Config
[options=header]
|===
| param | default | description
| iterateList | true a| execute operation statements once per batchSize (whole batchSize list is passed in as parameter \{_batch})
* A value of true is equivalent to `batchMode: "BATCH"`
* A value of false is equivalent to `batchMode: "SINGLE"`
|===
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册