本文最后更新于 2022-08-28 11:30:56
map
1
| val streamMap = stream.map { x => x * 2 }
|
flatMap
1 2 3 4 5
| val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
|
filter
1 2 3 4 5
| val streamFilter = stream.filter{
x => x == 1
}
|
KeyBy
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
1 2 3 4 5 6 7 8
| def keyBy(fields: Int*): KeyedStream[T, JavaTuple]
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple]
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K]
|
滚动聚合算子
这些算子可以针对KeyedStream的每一个支流做聚合。
sum
1 2 3 4 5
| def sum(position: Int): DataStream[T]
def sum(field: String): DataStream[T]
|
min&minBy
1 2 3 4 5 6 7 8 9 10 11
|
def min(position: Int): DataStream[T]
def min(field: String): DataStream[T]
def minBy(position: Int): DataStream[T]
def minBy(field: String): DataStream[T]
|
max&maxBy
1 2 3 4 5 6 7 8
| def max(position: Int): DataStream[T]
def max(field: String): DataStream[T]
def maxBy(position: Int): DataStream[T]
def maxBy(field: String): DataStream[T]
|
reduce
1 2 3 4
| def reduce(fun: (T, T) => T): DataStream[T]
def reduce(reducer: ReduceFunction[T]): DataStream[T]
|
split
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

select
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

1 2 3 4 5 6 7 8 9 10 11
| val splitStream: SplitStream[String] = stream .split( data => { if (data.length>5) Seq("high") else Seq("low") } )
val high: DataStream[String] = splitStream.select("high") val low: DataStream[String] = splitStream.select("low") val all: DataStream[String] = splitStream.select("high", "low")
|
Connect
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap,CoFlatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

1 2 3 4 5 6 7
| val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) ) val connected = warning.connect(low)
val coMap = connected.map( warningData => (warningData._1, warningData._2, "warning"), lowData => (lowData.id, "healthy") )
|
union
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。

1 2 3
| val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream) unionStream.print("union:::")
|
Connect与Union 区别:
- Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
- Connect只能操作两个流,Union可以操作多个。