How to use JStorm new window

前言

storm的窗口机制(即前一版的JStorm window),最大的问题是所有窗口都需要憋数据,即,消息过来的时候,并不会触发窗口计算, 只是简单地把数据堆在内存中,一直到达触发窗口计算的条件,才会拿所有的数据进行计算。显然,这种设计是不实用的,窗口稍微大一点就会爆掉了。

基于此,JStorm重新设计了窗口的实现,在2.3版本中开始提供对新窗口机制的实现。

支持的窗口

目前jstorm支持以下窗口:

1. [sliding/tumbling] count window

2. [sliding/tumbling] processing time window

3. [sliding/tumbling] event time window

4. processing time session window

5. event time session window

并支持以下时间特征:

1. processing time

2. event time

3. ingestion time(即消息进入系统的时间)

example

所有例子

新的window的接口


// 注意与storm的window的package区分
package com.alibaba.jstorm.window;

interface IWindowedBolt<T extends Tuple> extends IComponent {

    void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

    void cleanup();

    /**
     * 初始化窗口的状态,这个状态是用户自定义状态,当窗口第一次建立时创建,即第一条属于该窗口的消息流入时创建。
     * jstorm框架会管理每个窗口对应的用户状态。可以认为是这个窗口计算的结果以及上下文信息。
     */
    Object initWindowState(TimeWindow window);

    /**
     * 窗口消息到来时执行的计算。与storm不同,jstorm并不会憋数据,而是增量计算。因此每来一条消息都会直接
     * 触发计算以及更新窗口状态。如果一条消息属于多个窗口,那么每个窗口都会计算一次。
     *
     * @param tuple  新到达的消息
     * @param state  用户自定义状态,由框架传给用户(最开始由用户创建)。有一个限制:必须是引用类型,不能是String, Integer这种类型。
     * @param window 当前消息所属的窗口,如果有多个窗口,则execute方法本身被调用多次,每次调用时state和window都不相同。
     */
    void execute(T tuple, Object state, TimeWindow window);

    /**
     * purge一个window。当window到期时被调用。用户可以对此时的window状态做自定义操作,如存储到外部系统等。
     *
     * @param state  window状态
     * @param window 到期的窗口
     */
    void purgeWindow(Object state, TimeWindow window);
}

执行流程

整体上,jstorm的window类似于flink的window,不过没有flink的设计那么复杂。按照flink的设计,仍然存在一定的可能性(即当用户使用了Evictor的时候), 会在内存中憋数据,这不是我们所期望的。

jstorm的整体流程如下:

  1. 创建WindowedBoltExecutor

  2. 到达一条tuple,都会先抽取消息的时间(processing time / event time),然后为这条消息分配窗口(一个或多个,取决于窗口类型)。

  3. 对这条tuple,遍历步骤2分配的窗口,调用execute(T tuple, Object state, TimeWindow window)进行计算。 在计算时,如果检测到这个window对应的用户状态为空,则调用Object initWindowState()初始化window状态。

  4. 检查window是否到期,如果到期,则调用void purgeWindow(Object state, TimeWindow window),同时删除对应的window状态。

使用例子

来看一个word count的例子,所有最新window的例子,都在jstorm源码2.3分支下,sequence-split-merge工程的com.alipay.dw.jstorm.example.newindow包下。

为了叙述方便,我们把无关代码都精简了,只关注如何定义window以及用户的window实现:

package com.alipay.dw.jstorm.example.newindow;

// import xxx
// ...

public class FastWordTimeWindowTopology {
    private static Logger LOG = LoggerFactory.getLogger(FastWordTimeWindowTopology.class);

    public static class FastRandomSentenceSpout implements IRichSpout {
        SpoutOutputCollector _collector;
        Random _rand;
        long startTime;
        long sentNum = 0;
        long maxSendNum;
        int index = 0;

        private static final String[] CHOICES = {
                "JStorm is a distributed and fault-tolerant realtime computation system.",
                "Whenever a worker process crashes, ",
                "the scheduler embedded in the JStorm instance immediately spawns a new worker process to take the place of the failed one.",
                " The Acking framework provided by JStorm guarantees that every single piece of data will be processed at least once."};

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _rand = new Random();
            startTime = System.currentTimeMillis();
            maxSendNum = JStormUtils.parseLong(conf.get("max.send.num"), 1000L);
        }

        @Override
        public void nextTuple() {
            if (sentNum >= maxSendNum) {
                JStormUtils.sleepMs(1);
                return;
            }

            sentNum++;
            String sentence = CHOICES[index++];
            if (index >= CHOICES.length) {
                index = 0;
            }
            _collector.emit(new Values(sentence));
            JStormUtils.sleepMs(10);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
        
        // ...
    }

    public static class SplitSentence implements IRichBolt {
        OutputCollector collector;

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split("\\s+")) {
                collector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
   
        // ...
    }

    // window的实现,注意它从BaseWindowedBolt派生,因为构造window时需要BaseWindowedBolt。
    public static class WordCount extends BaseWindowedBolt<Tuple> {
        OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        // 初始化window状态,对于word count的例子,我们需要的是一个word -> count的map
        @Override
        public Object initWindowState() {
            return new HashMap<>();
        }

        // 执行消息,更新word count
        @Override
        public void execute(Tuple tuple, Object state, TimeWindow window) {
            Map<String, Integer> counts = (Map<String, Integer>) state;
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            counts.put(word, ++count);
        }

        // purge窗口。这里我们就简单地打印出这个窗口所有消息的word count。
        @Override
        public void purgeWindow(Object state, TimeWindow window) {
            Map<String, Integer> counts = (Map<String, Integer>) state;
            System.out.println("purging window: " + window);
            System.out.println("=============================");
            for (Map.Entry<String, Integer> entry : counts.entrySet()) {
                System.out.println("word: " + entry.getKey() + "\tcount: " + entry.getValue());
            }
            System.out.println("=============================");
            System.out.println();
        }
    }

    static Config conf = JStormHelper.getConfig(null);

    public static void test() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new FastRandomSentenceSpout(), 1);
        builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");

        // 构造一个大小为1分钟,每隔500ms滑动一次的窗口
        builder.setBolt("count", new WordCount()
                        .timeWindow(Time.seconds(1L), Time.milliseconds(500L)),
                1).fieldsGrouping("split", new Fields("word"));

        String[] className = Thread.currentThread().getStackTrace()[1].getClassName().split("\\.");
        String topologyName = className[className.length - 1];
        JStormHelper.runTopology(builder.createTopology(), topologyName, conf, 60,
                    new JStormHelper.CheckAckedFail(conf), true);
    }

    public static void main(String[] args) throws Exception {
        conf = JStormHelper.getConfig(args);
        test();
    }
}

稍微复杂的需求

一个关于UV的常见需求是:需要每分钟统计某个页面,今天从0点开始,截至到目前为止的UV。

这种情况下,窗口仍然是1分钟的窗口,但是窗口被purge之后,我们不期望其状态被清空,而是希望今天的窗口状态可以累积下来。直到下一天的0点再被清空。

在jstorm中,这个需求非常简单,只需要稍微改一下build topology的代码即可:

        builder.setBolt("count", new WordCount()
                        .timeWindow(Time.minutes(1L))
                        .withStateWindow(Time.hours(24)),
                1).fieldsGrouping("split", new Fields("word"));

上面的代码中,只加了一行:withStateWindow(Time.hours(24)),即告诉框架,我需要累积24小时的状态数据。而用户的业务代码则无需做任何修改。

Event Time

重构后的window支持event time。但是有以下需要注意的:

TimestampExtractor

这个接口用来定义如何从你的tuple消息中抽取出event time的时间戳。通常建议你的window bolt直接实现此接口即可,不需要再单独写一个类。

WatermarkGenerator

使用event time的时候,需要定义watermark,否则jstorm会使用默认的watermark实现:PeriodicWatermarkGenerator。 即定期往下游发送watermark。

watermark标识了一个流是一直前进,不可回退的。即,假设当前收到了2017-01-13 18:00:00的watermark,那么意味着上游后续要发送的所有数据都不 会早于18:00:00。如果早于这个时间,则认为是late element。处理策略参见下面late element。

当收到一条消息时,jstorm同时会调用WatermarkGenerator#onElement方法,以更新它内部的timestamp。同时,通过watermarkInterval来定时 发送watermark和检查event windows是否需要purge。

jstorm支持几种通过watermark触发window purge的策略: * GLOBAL_MAX_TIMESTAMP 全局最大时间戳。这种策略会始终使用接收到的所有task中的最大的时间戳。如果这个时间戳>窗口边界,就会purge window * MAX_TIMESTAMP_WITH_RATIO 这个策略在上面的基础上,还指定了收到上游watermark的task的比例,默认为0.9。即,只有当时间戳>窗口边界, 且收到了90%以上的task的watermark,才会purge window * TASK_MAX_GLOBAL_MIN_TIMESTAMP 这个策略,会记录所有上游task发送的watermark的值,然后取所有task的watermark的最小值作为当前时间戳, 以防止各别task的timestamp很大导致窗口被过早purge。这种策略是jstorm的默认策略。

乱序消息的处理

event time的场景中,消息的乱序几乎是必然会出现的。 在上面的场景中即为,当前watermark已经到达18:00:00,但是下一条消息到达时,发现它的时间戳是17:30:00。这就是一条乱序的消息。

默认情况下,jstorm会丢弃这条消息。也可以通过实现Retractor接口来重新计算一个已经purge的窗口值。见下面。

Retractor

这个接口只有一个方法:

void retract(Tuple element, Collection<TimeWindow> windows);

即,这个element所属的窗口为windows这个集合,由用户指定如何处理这条乱序消息。

以word count为例,我们可能每隔1分钟计算word count,然后最终把每个window下的word count输出到hbase或者tair中。

在watermark=18:00:00时,假设之前计算出来的17:30分这个窗口的word: aa的count=100。接下来我们又收到一条消息,timestamp=17:30:00,word=aa。

此时我们需要更新HBase/Tair中17:30分这个窗口中对应的aa的count值。那么我们可以在retract方法中,直接去update HBase或者tair。

当然,如果经常会发生乱序,一条条处理效率显然是太慢了。建议用户可以保存一个Map<TimeWindow, List<Tuple>>(或者使用guava中的Multimap)对象, 当达到一定数量再触发一次计算。

不过说了这么多,使用retractor的一个最大前提还是:计算结果必须是可被更新的。否则就只能丢弃然后打个日志了。

使用

请参考com.alibaba.jstorm.window.BaseWindowedBolt类中提供的接口。

示例程序

  1. 在jstorm源码sequence-split-merge工程中,com.alipay.dw.jstorm.example.newindow package下有大量的window的使用示例代码。

  2. http://gitlab.alibaba-inc.com/aloha/tt-window-test 提供了一个完整的从真实线上TT的aplus topic接消息,使用event time进行处理的例子。

注意点

  1. 需要注意的是,jstorm的window机制不能跟acker系统一起正确地工作。因此如果你的业务重度依赖于acker来实现错误处理,那么不能使用最新的window机制。