任务 - EACH 迭代任务
EACH 任务概述
EACH用于对批量数据进行迭代处理,采用管道Pipeline式设计。首先输入一个数据源,然后执行一系列的中间步骤,最后得到结果输出。EACH实现类是EachTask其底层是基于java8中Stream流实现。
EachTask 结构
java
public class EachTask extends AbstractTask {
private Stream<?> stream;
}stream属性不可以直接设置,而是通过输入、中间、输出三种方法进行进组装而来。
EACH 任务方法列表
| 类型 | 方法 | 参数 | 备注 |
|---|---|---|---|
| 输入 | from | 数组或集合 | 指定一个数组或集合做为数据源 |
| 中间 | filter 过滤 | 闭包 | 返回布尔值,等于true该项将进入后续操作 |
| 中间 | map 转换 | 闭包 | 返回一个新值,后续操作基于新值进行处理 |
| 中间 | delay 延迟执行 | 整型 | delay 1000 表示延迟1秒 |
| 中间 | limit 限数 | 整型 | 限制进入后续流程的项目数 |
| 中间 | distinct 去重 | 闭包或空 | distinct() 默认去重distinct { item-> item.id } 基于指定属性进行去重 |
| 中间 | sorted排序 | 闭包或空 | sorted() 默认排序sorted {item-> item.age } 基于指定属性进行排序 |
| 输出 | list 返回列表 | 缺省结果操作 | |
| 输出 | first 强制返回第一项 | 如果没有结果将会返null | |
| 输出 | anyMatch任意项匹配 | 闭包 | 任意一个结果满足条件时返回true否则返回false |
| 输出 | allMatch所有项匹配 | 闭包 | 所有结果满足条件时返回true否则返回false |
| 输出 | count计数 | 返回结果总数量 | |
| 输出 | groupBy对结果进行统计 | 闭包 | 返回一个Map<?,List>基于指定的key进行统计分组。 |
方法使用规则
- 输入方法只能设置一个必须在开头处
- 输出方法只能设置一个必须在结尾处
- 中间步骤方法可以有多个
from 数据源说明
from 1..5指定一个范围作为数据源 包含1到5总共6个数字from 1,2,3,4,5指定多个参为一组数据源from input.items指定一个变量作为数据源
EACH 基本示例
t1= EACH {
from 1,2,3,3,5 // 构建一个数据源
filter { item -> // 找出所有奇数
item %2 ==1
}
map { item-> // 转换
"奇数$item"
}
distinct() // 去重
list() // 返回列表 (缺省结果操作 可省略)
}t1 执行结果是:
["奇数1","奇数3","奇数5"]获取数据项
每个节点可通过在闭包中显示声明一个参数来获取上一个节点的数据项
t1=EACH {
from [3,2,1,4,5]
map { item ->
[id:item, name: "luban"] // 封装用户信息
}
sorted { item -> item.id } // 基于id排序
limit 2
}最后返回结果:
[
[id:1,name:"luban"],
[id:2,name:"luban"]
]- Groovy 闭包会声明一个隐式参数
it但由于EACH任务本身的闭包也声明了这个参数,所以通过it无法正常获取数项,必须显示声明一个与其它变量或关键字不冲突的参数名。
引用其它任务
在EACH中可嵌套一个其它任务作为一个中间map操作,所有项会依次调用该任务, 并将结果传递到下个操作中。
t1=EACH {
from 1,2,3
HTTP { item ->
method ="POST"
url = "https://httpbin.org/post"
json """
{
"id": "${item}",
"name": "xiaoming_${item}",
"age": 18
}
"""
}
mysql { item ->
sql = "select * from users where id=${item.json.id}"
}
limit 2
}- 上例首先构建一个数据源列表1,2,3 然后将数字项依次传递给下一个步骤
HTTP任务拼装json参数后执行Http请求并将结果传递给下一个步骤mysql任务是一个app任务 它会拿到上一步结果拼成参数后执行- 上例中http与sql未出错的情况下将执行两次,因为第三次被
limit 2这个步骤限制了 - 最后该EACH任务将返回 一个mysql查询的结果列表
注意:严禁在
EACH任务块中继续嵌套引用EACH任务会导至DSL变得复杂。
上述示例from指定了三个项目,只有前2个将作为http的参数被调用,因数第三个被limit 2限制了,最后http调用的结果将组成一个新的列表被返回.
迭代中的分支如何处理
EACH 管道流为单向设计,无法进行分支处理,这种情况正确做法是将分支拆分成多条管道。如学生成绩有有优秀、普通、不及格 三种情况:
// 优秀成绩处理管道
t1=EACH {
from input.student
filter { item ->
item.score >=90
}
map { }
count() // 返回优秀学生数
}
// 普通成绩处理管道
t2=EACH {
from input.student
filter { item ->
item.score in 60..89
}
map { }
count() // 普通成绩数
}
// 不及格成绩处理管道
t3=EACH {
from input.student
filter { item ->
item.score < 60
}
map { }
count() // 不及格数
}注意事项
- 如果分支项过多,或者是分支中又存在分支,这种复杂情况不建议采用
EACH管道实现,而是脚本中采用循环进行处理。 - 引用外部任务时不能采用快捷方式声明,只能采用带任务块(闭包)方式声明。