面向 API 编程: Structured Streaming

 

Structured Streaming

Structured Streaming 概述

Structured Streaming 的关键思想是将实时数据流视为一张正在不断添加数据的表. 可以把流计算等同于在一个静态表上的批处理查询, Spark 会在不断添加数据的无界输入表上运行计算, 并进行增量查询

无界表上对输入的查询将生成结果表, 系统每隔一定的周期会触发对无界表的计算并更新结果表

两种处理模型

微批处理

Structured Streaming 默认使用微批处理执行模型, 这意味着 Spark流计算引擎会定期检查流数据源, 并对自上一批次结束后到达的新数据执行批量查询

数据到达和得到处理并输出结果之间的延时超过 100 毫秒

持续处理

  • Spark 从 2.3.0 版本开始引入了持续处理的试验性功能, 可以实现流计算的毫秒级延迟
  • 在持续处理模式下, Spark 不再根据触发器来周期性启动任务, 而是启动一系列的连续读取, 处理和写入结果的长时间运行的任务

Structured Streaming 和 Spark SQL, Spark Streaming 关系

  • Structured Streaming 可以对 DataFrame/Dataset 应用前面章节提到的各种操作, 包括 select, where, groupBy, map, filter, flatMap 等.
  • Spark Streaming 只能实现秒级的实时响应, 而 StructuredStreaming 由于采用了全新的设计方式, 采用微批处理模型时可以实现100 毫秒级别的实时响应, 采用持续处理模型时可以支持毫秒级的实时响应.

Structured Streaming 程序编写

实例任务: 一个包含很多行英文语句的数据流源源不断到达, Structured Streaming 程序对每行英文语句进行拆分, 并统计每个单词出现的频率

# 由于程序中需要用到拆分字符串和展开数组内的所有单词的功能
# 所以引用了来自 `pyspark.sql.functions` 里面的 split 和 explode 函数
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode


# 创建 SparkSession 对象
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCount") \
        .getOrCreate()
    spark.sparkContext.setLogLevel('WARN')

    # 创建输入数据源
    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()

    # 定义流计算过程
    words = lines.select(
      explode(
          split(lines.value, " ")
      ).alias("word")
    )
    wordCounts = words.groupBy("word").count()


    # 启动流计算并输出结果
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .trigger(processingTime="8 seconds") \
        .start()
    query.awaitTermination()

输入源

File 源

File 源 (或称为”文件源”) 以文件流的形式读取某个目录中的文件, 支持的文件格式为 csv, json, orc, parquet, text 等.

需要注意的是, 文件放置到给定目录的操作应当是原子性的, 即不能长时间在给定目录内打开文件写入内容, 而是应当采取大部分操作系统都支持的, 通过写入到临时文件后移动文件到给定目录的方式来完成.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 导入需要用到的模块
import os
import shutil
from pprint import pprint
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType

# 定义 JSON 文件的路径常量
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'

if __name__ == "__main__":
    # 定义模式, 为时间戳类型的 eventTime, 字符串类型的操作和省份组成
    schema = StructType([
        StructField("eventTime", TimestampType(), True),
        StructField("action", StringType(), True),
        StructField("district", StringType(), True)])
    spark = SparkSession \
        .builder \
        .appName("StructuredEMallPurchaseCount") \
        .getOrCreate()
 
    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("json") \
        .schema(schema) \
        .option("maxFilesPerTrigger", 100) \
        .load(TEST_DATA_DIR_SPARK)

    # 定义窗口
    windowDuration = '1 minutes'
    windowedCounts = lines \
        .filter("action = 'purchase'") \
        .groupBy('district', window('eventTime', windowDuration)) \
        .count() \
        .sort(asc('window'))

    # 启动流计算
    query = windowedCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .option('truncate', 'false') \
        .trigger(processingTime="10 seconds") \
        .start()
    query.awaitTermination()

输出操作

启动流计算

DataFrame/Dataset 的 .writeStream() 方法将会返回 DataStreamWriter 接口, 接口通过 .start() 真正启动流计算, 并将 DataFrame/Dataset 写入到外部的输出接收器, DataStreamWriter 接口有以下几个主要函数

  1. format: 接收器类型.
  2. outputMode: 输出模式, 指定写入接收器的内容, 可以是 Append 模式, Complete 模式或 Update 模式.
  3. queryName: 查询的名称, 可选, 用于标识查询的唯一名称.
  4. trigger: 触发间隔, 可选, 设定触发间隔, 如果未指定, 则系统将在上一次处理完成后立即检查新数据的可用性. 如果由于先前的处理尚未完成导致超过触发间隔, 则系统将在处理完成后立即触发新的查询

输出模式

输出模式用于指定写入接收器的内容, 主要有以下几种:

  • Append 模式:只有结果表中自上次触发间隔后增加的新行, 才会被写入外部存储器. 这种模式一般适用于”不希望更改结果表中现有行的内容”的使用场景.
  • Complete 模式:已更新的完整的结果表可被写入外部存储器.
  • Update 模式:只有自上次触发间隔后结果表中发生更新的行, 才会被写入外部存储器. 这种模式与 Complete 模式相比, 输出较少, 如果结果表的部分行没有更新, 则不会输出任何内容. 当查询不包括聚合时, 这个模式等同于 Append 模式.

输出接收器

系统内置的输出接收器包括 File 接收器, Kafka 接收器, Foreach 接 收器, Console 接收器, Memory 接收器等, 其中, Console 接收器 和 Memory 接收器仅用于调试用途. 有些接收器由于无法保证输出 的持久性, 导致其不是容错的. 以 File 接收器为例, 这里把 7.2 节的实例修改为使用 File 接收器, 修改后的代码文件为 StructuredNetworkWordCountFileSink.py

#!/usr/bin/env python3
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.functions import length


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("StructuredNetworkWordCountFileSink") \
        .getOrCreate()

    spark.sparkContext.setLogLevel('WARN')

    lines = spark \
        .readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()


    words = lines.select(
        explode(
            split(lines.value, " ")
            ).alias("word")
        )
    all_length_5_words = words.filter(length("word") == 5)
    query = all_length_5_words \
        .writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "file:///tmp/filesink") \
        .option("checkpointLocation", "file:///tmp/file-sink-cp") \
        .trigger(processingTime="8 seconds") \
        .start()
    query.awaitTermination()

由于程序执行后不会在终端输出信息, 这时可新建一个终端, 执行如下命令查看 File 接收器保存的位置:

$ cd /tmp/filesink
$ ls

可以看到以 parquet 格式保存的类似如下的文件列表:

part-00000-2bd184d2-e9b0-4110-9018-a7f2d14602a9-c000.snappy.parquet
part-00000-36eed4ab-b8c4-4421-adc6-76560699f6f5-c000.snappy.parquet
part-00000-dde601ad-1b49-4b78-a658-865e54d28fb7-c000.snappy.parquet
part-00001-eedddae2-fb96-4ce9-9000-566456cd5e8e-c000.snappy.parquet
_spark_metadata

可以使用 strings 命令查看文件内的字符串, 具体如下:

$ strings part-00003-89584d0a-db83-467b-84d8-53d43baa4755-c000.snappy.parquet

References