0
点赞
收藏
分享

微信扫一扫

本地模式运行storm的demo


本例实现的是本地模式运行storm的wordcount demo!

开发过程中,可以用本地模式来运行Storm,这样就能在本地开发,在进程中测试Topology。一切就绪后,以远程模式运行 Storm,提交用于在集群中运行的Topology。

创建工程:demo-storm
目录结构如下:
demo-storm
——src/main/java
————com.youku.demo
————————bolts
————————spouts
——src/test/java
——src/main/resource
————words.txt



本地模式运行storm的demo_ide

storm-demo工程目录



WordCounter.java:


package com.youku.demo.bolts;
 
import java.util.HashMap;
import java.util.Map;
 
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
 
public class WordCounter extends BaseBasicBolt {
 
    Integer id;
    String name;
    Map<String, Integer> counters;
 
    /**
     * At the end of the spout (when the cluster is shutdown
     * We will show the word counters
     */
    @Override
    public void cleanup() {
        System.out.println("-- Word Counter ["+name+"-"+id+"] --");
        for(Map.Entry<String, Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }
 
    /**
     * On create
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters = new HashMap<String, Integer>();
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }
 
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
 
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        /**
         * If the word dosn't exist in the map we will create
         * this, if not We will add 1
         */
        if(!counters.containsKey(str)){
            counters.put(str, 1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
    }
}



WordNormalizer.java:


package com.youku.demo.bolts;
 
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
 
public class WordNormalizer extends BaseBasicBolt {
 
    public void cleanup() {}
 
    /**
     * The bolt will receive the line from the
     * words file and process it to Normalize this line
     *
     * The normalize will be put the words in lower case
     * and split the line to get all words in this
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getString(0);
        String[] words = sentence.split(" ");
        for(String word : words){
            word = word.trim();
            if(!word.isEmpty()){
                word = word.toLowerCase();
                collector.emit(new Values(word));
            }
        }
    }
 
    /**
     * The bolt will only emit the field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}



WordReader.java:



package com.youku.demo.spouts;
 
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class WordReader extends BaseRichSpout {
 
    private SpoutOutputCollector collector;
    private FileReader fileReader;
    private boolean completed = false;
    public void ack(Object msgId) {
        System.out.println("OK:"+msgId);
    }
    public void close() {}
    public void fail(Object msgId) {
        System.out.println("FAIL:"+msgId);
    }
 
    /**
     * The only thing that the methods will do It is emit each
     * file line
     */
    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if(completed){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                //Do nothing
            }
            return;
        }
        String str;
        //Open the reader
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            //Read all lines
            while((str = reader.readLine()) != null){
                /**
                 * By each line emmit a new value with the line as a their
                 */
                this.collector.emit(new Values(str),str);
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }
 
    /**
     * We will create the file and get the collector object
     */
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        try {
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }
 
    /**
     * Declare the output field "word"
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("line"));
    }
}



TopologyMain.java:


package com.youku.demo;
 
import com.youku.demo.bolts.WordCounter;
import com.youku.demo.bolts.WordNormalizer;
import com.youku.demo.spouts.WordReader;
 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
 
public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
 
        //Topology definition
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word-reader",new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer())
            .shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),1)
            .fieldsGrouping("word-normalizer", new Fields("word"));
 
        //Configuration
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(true);
        //Topology run
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
        Thread.sleep(2000);
        cluster.shutdown();
    }
}



pom.xml:


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <groupId>com.youku.demo</groupId>
    <artifactId>demo-storm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>demo-storm</name>
    <url>http://maven.apache.org</url>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <!-- Repository where we can found the storm dependencies -->
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
 
    <dependencies>
        <!-- Storm Dependency -->
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>



words.txt:


storm
test
are
great
is
an
storm
simple
application
but
very
powerfull
really
StOrm
is
great



运行的时候需要配置参数:src/main/resources/words.txt 指定输入文件



本地模式运行storm的demo_ide_02

运行命令



日志输出:



本地模式运行storm的demo_Storm_03

运行日志



会报好多zookeeper异常,还有最后的日志文件无法删除的异常,目前忽略了,O(∩_∩)O呵呵~


271  [main-SendThread(localhost:2000)] WARN  org.apache.zookeeper.ClientCnxn  - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.SocketException: Address family not supported by protocol family: connect
    at sun.nio.ch.Net.connect(Native Method)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
    at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1050)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1077)
 
java.io.IOException: Unable to delete file: C:\Users\ThinkPad\AppData\Local\Temp\3fbb080f-e585-42e6-8b1b-d6ae024503ac\version-2\log.1
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1390)
    at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
    at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
    at org.apache.commons.io.FileUtils.cleanDirectory(FileUtils.java:1044)
    at org.apache.commons.io.FileUtils.deleteDirectory(FileUtils.java:977)
    at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:1381)
    at backtype.storm.util$rmr.invoke(util.clj:413)
    at backtype.storm.testing$kill_local_storm_cluster.invoke(testing.clj:163)
    at backtype.storm.LocalCluster$_shutdown.invoke(LocalCluster.clj:21)
    at backtype.storm.LocalCluster.shutdown(Unknown Source)
    at com.youku.demo.TopologyMain.main(TopologyMain.java:33)



下载Pdf

举报

相关推荐

0 条评论