Skip to content

任务 - EACH 迭代任务

EACH 任务概述

EACH用于对批量数据进行迭代处理,采用管道Pipeline式设计。首先输入一个数据源,然后执行一系列的中间步骤,最后得到结果输出。EACH实现类是EachTask其底层是基于java8中Stream流实现。

EachTask 结构

java
public class EachTask extends AbstractTask {
    private Stream<?> stream;

}
  • stream属性不可以直接设置,而是通过输入、中间、输出三种方法进行进组装而来。

EACH 任务方法列表

类型方法参数备注
输入from数组或集合指定一个数组或集合做为数据源
中间filter 过滤闭包返回布尔值,等于true该项将进入后续操作
中间map 转换闭包返回一个新值,后续操作基于新值进行处理
中间delay 延迟执行整型delay 1000 表示延迟1秒
中间limit 限数整型限制进入后续流程的项目数
中间distinct 去重闭包或空distinct() 默认去重
distinct { item-> item.id } 基于指定属性进行去重
中间sorted排序闭包或空sorted() 默认排序
sorted {item-> item.age } 基于指定属性进行排序
输出list 返回列表缺省结果操作
输出first 强制返回第一项如果没有结果将会返null
输出anyMatch任意项匹配闭包任意一个结果满足条件时返回true否则返回false
输出allMatch所有项匹配闭包所有结果满足条件时返回true否则返回false
输出count计数返回结果总数量
输出groupBy对结果进行统计闭包返回一个Map<?,List>基于指定的key进行统计分组。

方法使用规则

  • 输入方法只能设置一个必须在开头处
  • 输出方法只能设置一个必须在结尾处
  • 中间步骤方法可以有多个

from 数据源说明

  • from 1..5 指定一个范围作为数据源 包含1到5总共6个数字
  • from 1,2,3,4,5指定多个参为一组数据源
  • from input.items 指定一个变量作为数据源

EACH 基本示例

t1= EACH {
  from 1,2,3,3,5		// 构建一个数据源
  filter { item -> // 找出所有奇数
    item %2 ==1
  }
  map { item-> 		// 转换
    "奇数$item"
  }
  distinct() 		 // 去重
  list()		 		// 返回列表 (缺省结果操作 可省略)
}

t1 执行结果是:

["奇数1","奇数3","奇数5"]

获取数据项

每个节点可通过在闭包中显示声明一个参数来获取上一个节点的数据项

t1=EACH {
  from [3,2,1,4,5]
  map { item ->
    [id:item, name: "luban"] // 封装用户信息
  }
  sorted { item -> item.id } // 基于id排序
  limit 2
}

最后返回结果:

[
  [id:1,name:"luban"],
  [id:2,name:"luban"]
]
  • Groovy 闭包会声明一个隐式参数it 但由于EACH任务本身的闭包也声明了这个参数,所以通过it无法正常获取数项,必须显示声明一个与其它变量或关键字不冲突的参数名。

引用其它任务

EACH中可嵌套一个其它任务作为一个中间map操作,所有项会依次调用该任务, 并将结果传递到下个操作中。

t1=EACH {
  from 1,2,3
  HTTP { item ->
    method ="POST"
    url = "https://httpbin.org/post"
    json """
    	{
        "id": "${item}",
        "name": "xiaoming_${item}",
        "age": 18
			}
    """
  }
  mysql { item ->
    sql = "select * from users where id=${item.json.id}"
  }
	limit 2
}
  • 上例首先构建一个数据源列表1,2,3 然后将数字项依次传递给下一个步骤
  • HTTP任务拼装json参数后执行Http请求并将结果传递给下一个步骤
  • mysql 任务是一个app任务 它会拿到上一步结果拼成参数后执行
  • 上例中http与sql未出错的情况下将执行两次,因为第三次被limit 2这个步骤限制了
  • 最后该EACH任务将返回 一个mysql查询的结果列表

注意:严禁在EACH任务块中继续嵌套引用EACH任务会导至DSL变得复杂。

上述示例from指定了三个项目,只有前2个将作为http的参数被调用,因数第三个被limit 2限制了,最后http调用的结果将组成一个新的列表被返回.

迭代中的分支如何处理

EACH 管道流为单向设计,无法进行分支处理,这种情况正确做法是将分支拆分成多条管道。如学生成绩有有优秀、普通、不及格 三种情况:

  // 优秀成绩处理管道
t1=EACH {
  from input.student
  filter { item ->
    item.score >=90
  }
  map { }
	count() // 返回优秀学生数
}

 // 普通成绩处理管道
t2=EACH {
  from input.student
  filter {  item ->
    item.score in 60..89
  }
  map { }
	count() // 普通成绩数
}

// 不及格成绩处理管道
t3=EACH {
  from input.student
  filter { item ->
    item.score < 60
  }
  map {  }
	count() // 不及格数
}

注意事项

  • 如果分支项过多,或者是分支中又存在分支,这种复杂情况不建议采用EACH管道实现,而是脚本中采用循环进行处理。
  • 引用外部任务时不能采用快捷方式声明,只能采用带任务块(闭包)方式声明。

下一步