storm整合springboot Storm框架整合springboot的方法

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

storm整合springboot Storm框架整合springboot的方法

本拉邓   2021-03-30 我要评论

Storm:最火的流式处理框架

伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦。

•实现一个实时计算系统

全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。

1.低延迟。都说了是实时计算系统了,延迟是一定要低的。

2.高性能。性能不高就是浪费机器,浪费机器是要受批评的哦。

3.分布式。系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。

4.可扩展。伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。

5.容错。这是分布式系统中通用问题。一个节点挂了不能影响我的应用。

好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。我们再继续往下看。

1.容易在上面开发应用程序。亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。

2.消息不丢失。用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。这个要求有点高哦。

诞 生

 在2011年Storm开源之前,由于Hadoop的火红,整个业界都在喋喋不休地谈论大数据。Hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,Hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。

有需求也就有创造,在Hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补Hadoop的实时性为目标而被创造出来。而在这个节骨眼上Storm横空出世了。

Storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点:

•分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。

•运维简单:Storm的部署的确简单。虽然没有Mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。

•高度容错:模块都是无状态的,随时宕机重启。

•无数据丢失:Storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。

•多语言:实际上,Storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用Java实现。

下面介绍下Storm框架整合springboot的方法

我们知道Storm本身是一个独立运行的分布式流式数据处理框架,Springboot也是一个独立运行的web框架。那么如何在Strom框架中集成Springboot使得我们能够在Storm开发中运用Spring的Ioc容器及其他如Spring Jpa等功能呢?我们先来了解以下概念:

•Storm主要的三个Component:Topology、Spout、Bolt。Topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。

•SpringApplication:是配置Spring应用上下文的起点。通过调用SpringApplication.run()方法它将创建ApplicationContext实例,这是我们能够使用Ioc容器的主要BeanFactory。之后Spring将会加载所有单例模式的beans,并启动后台运行的CommandLineRunner beans等。

•ApplicationContextAware:这是我们能够在普通Java类中调用Spring容器里的beans的关键接口。

实现原理

Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化Spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到Spring的ApplicationContext,有了ApplicationContext实例对象,Spring的所有功能就都能用上了。

•Spout.open方法实现

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
 //启动Springboot应用
 SpringStormApplication.run();

 this.map = map;
 this.topologyContext = topologyContext;
 this.spoutOutputCollector = spoutOutputCollector;
}

•Bolt.prepare方法实现

@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
 //启动Springboot应用
 SpringStormApplication.run();

 this.map = map;
 this.topologyContext = topologyContext;
 this.outputCollector = outputCollector;
}
•SpringStormApplication启动类
@SpringBootApplication
@ComponentScan(value = "com.xxx.storm")
public class SpringStormApplication {
 /**
  * 非工程启动入口,所以不用main方法
  * @param args
  */
 public static void run(String ...args) {
  SpringApplication app = new SpringApplication(SpringStormApplication.class);
  //我们并不需要web servlet功能,所以设置为WebApplicationType.NONE
  app.setWebApplicationType(WebApplicationType.NONE);
  //忽略掉banner输出
  app.setBannerMode(Banner.Mode.OFF);
  //忽略Spring启动信息日志
  app.setLogStartupInfo(false);
  app.run(args);
 }
}

与我们传统的Springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:

@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
 public static void main(String[] args) {
  SpringApplication.run(PlatformApplication.class, args);
 }
}

•在spout/bolt中调用了SpringStormApplication.run方法后,我们还需要能够拿到ApplicationContext容器对象,这时候我们还需要实现ApplicationContextAware接口,写个工具类BeanUtils:

@Component
public class BeanUtils implements ApplicationContextAware {
 private static ApplicationContext applicationContext = null;
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  if (BeanUtils.applicationContext == null) {
   BeanUtils.applicationContext = applicationContext;
  }
 }
 public static ApplicationContext getApplicationContext() {
  return applicationContext;
 }
 public static Object getBean(String name) {
  return getApplicationContext().getBean(name);
 }
 public static <T> T getBean(Class<T> clazz) {
  return getApplicationContext().getBean(clazz);
 }
 public static <T> T getBean(String name, Class<T> clazz) {
  return getApplicationContext().getBean(name, clazz);
 }
}

通过@Component注解使得Spring在启动时能够扫描到该bean,因为BeanUtils实现了ApplicationContextAware接口,Spring会在启动成功时自动调用BeanUtils.setApplicationContext方法,将ApplicationContext对象保存到工具类的静态变量中,之后我们就可以使用BeanUtils.getBean()去获取Spring容器中的bean了。

写个简单例子

•在FilterBolt的execute方法中获取Spring bean

@Override
public void execute(Tuple tuple) {
 FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
 filterService.deleteAll();
}

•定义FilterService类,这时候我们就可以使用Spring的相关注解,自动注入,Spring Jpa等功能了。

@Service("filterService")
public class FilterService {
 @Autowired
 UserRepository userRepository;
 
 public void deleteAll() {
  userRepository.deleteAll();
 }
}

将storm应用作为Springboot工程的一个子模块

工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动Strom的topology应用会有一个日志包依赖冲突。

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将Springboot的相关日志包排除掉,如下:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter</artifactId>
 <exclusions>
  <exclusion>
   <groupId>org.apache.logging.log4j</groupId>
   <artifactId>log4j-to-slf4j2</artifactId>
  </exclusion>
  <exclusion>
   <groupId>ch.qos.logback</groupId>
   <artifactId>logback-classic2</artifactId>
  </exclusion>
 </exclusions>
</dependency>

总结

以上所述是小编给大家介绍的Storm框架整合springboot的方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

猜您喜欢

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们