如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  从上面的图片可以清楚的了解到各个模块所处的位置。这篇文章主要是讲述开发Spark Streaming这块,因为Flume-ng这块不需要特别的处理,完全和Flume-ng之间的交互一样。所有的Spark Streaming程序都是以JavaStreamingContext作为切入点的。如下:

JavaStreamingContext jssc = 
    new JavaStreamingContext(master, appName, 
                             new Duration(1000), 
                             [sparkHome], [jars]);
JavaDStream<SparkFlumeEvent> flumeStream = 
                             FlumeUtils.createStream(jssc, host, port);

最后需要调用JavaStreamingContext的start方法来启动这个程序。如下:

jssc.start();
jssc.awaitTermination();

整个程序如下:

package scala;

import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

import java.nio.ByteBuffer;

/**
 * User: 过往记忆
 * Date: 14-7-8
 * Time: 下午23:16
 * bolg: 
 * 本文地址:/archives/1063
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
public static void JavaFlumeEventTest(String master, String host, int port) {
        Duration batchInterval = new Duration(2000);

        JavaStreamingContext ssc = new JavaStreamingContext(master, 
               "FlumeEventCount", batchInterval,
                System.getenv("SPARK_HOME"),
                JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
        StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
        JavaDStream<SparkFlumeEvent> flumeStream = 
                FlumeUtils.createStream(ssc, host, port, storageLevel);

        flumeStream.count().map(new Function<java.lang.Long, String>() {
            @Override
            public String call(java.lang.Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();

        ssc.start();
        ssc.awaitTermination();
}

然后开启Flume往这边发数据,在Spark的这端可以接收到数据:

如果你对Scala比较熟悉,下面是一段Scala的程序,功能和上面的一样:

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
 * User: 过往记忆
 * Date: 14-7-8
 * Time: 下午23:16
 * bolg: 
 * 本文地址:/archives/1063
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */

def ScalaFlumeEventTest(master : String, host : String, port : Int) {
    val batchInterval = Milliseconds(2000)

    val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

    val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY)

    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    ssc.start()
    ssc.awaitTermination()
}

  以上程序都是在Spark tandalone Mode下面运行的,如果你想在YARN上面运行,也是可以的,不过需要做点修改。具体怎么在Yarn上面运行,请参见官方文档。

本博客文章除特别声明,全部都是原创!
原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
本文链接: 【Spark和Flume-ng整合】(https://www.iteblog.com/archives/1063.html)
喜欢 (17)
分享 (0)
发表我的评论
取消评论

表情
本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
(4)个小伙伴在吐槽
  1. 打卡

    洛克鬼泣2019-04-02 20:17 回复
  2. 你好,我想请问下,yarn模式下跟flume的结合,你有没有试过,如果有试过的话,请点拨一下,在这里卡了很久!不胜感激!

    dingke2014-09-04 22:20 回复
  3. yarn模式跟flume结合的代码和上面类似。

    w3970907702014-10-20 13:52 回复
  • 小赞一下,之前五月份的时候开始搞flume,博主的一些入门文章写的很好,现在开始连接spark博主又已经写好了

    fc2014-08-27 14:37 回复