准备给即将做的一个项目搭建一个springboot的微服务架构,中间集成多种中间件,根据实际情况会优化中间件.
技术使用:springboot,Kafka,redis,shiro,swagger,Feign等.
一. 通过IDEA新建一个SpringBoot项目
1.创建springboot项目
注:如果出现’https://start.spring.io’ 的初始化失败 请多刷新几次
2.添加启动类
3.创建测试类TestController
4.在入口类Application中启动项目,启动成功如下图
5.打开浏览器输入http://localhost:8080/,测试成功
6.添加SpringBoot的配置文件application.properties,配置服务访问端口号
二. SpringBoot与mybatis整合
1.添加maven依赖
mysql
mysql-connector-java
com.alibaba
druid-spring-boot-starter
1.1.9
log4j
log4j
1.2.17
org.springframework.boot
spring-boot-starter-data-jpa
org.mybatis.spring.boot
mybatis-spring-boot-starter
1.1.1
2.创建entity、service、dao
(1).User实体类
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
@Getter
@Setter
@ToString
public class User implements Serializable {
private static final long serialVersionUID = 15615615461561151L;
private String loginName;
private String password;
}
(2).Mapper接口类
import com.iflytek.bim.cop.domain.entity.User;
import java.util.List;
public interface UserMapper {
List getAll();
}
(3).Service接口
import com.iflytek.bim.cop.domain.entity.User;
import java.util.List;
public interface IUserService{
List getAll();
}
(4).Service实现类
import java.util.List;
@Service
public class UserServiceImpl implements IUserService {
@Autowired
private UserMapper userMapper;
public List getAll() {
return userMapper.getAll();
}
}
3.在application.properties配置数据源以及mybatis
#数据源
jasypt.encryptor.password=EbfYkitulv73I2p0mXI50JMXoaxZTKJ7JCCT
spring.datasource.driverClassName=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://172.31.201.66:5432/test?useUnicode=true&characterEncoding=utf8
spring.datasource.username=postgres
spring.datasource.password=8989yyu
#阿里druid连接池驱动配置信息
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
#连接池的配置信息
#初始化大小,最小,最大
spring.datasource.initialSize=2
spring.datasource.minIdle=2
spring.datasource.maxActive=3
#配置获取连接等待超时的时间
spring.datasource.maxWait=6000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
spring.datasource.timeBetweenEvictionRunsMillis=60000
#配置一个连接在池中最小生存的时间,单位是毫秒
spring.datasource.minEvictableIdleTimeMillis=300000
spring.datasource.validationQuery=SELECt 1 FROM DUAL
spring.datasource.testWhileIdle=true
spring.datasource.testOnBorrow=false
spring.datasource.testOnReturn=false
#打开PSCache,并且指定每个连接上PSCache的大小
spring.datasource.poolPreparedStatements=true
spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
spring.datasource.filters=stat,wall,log4j
#通过connectProperties属性来打开mergeSql功能;慢SQL记录
spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
#mybatis配置
#配置Mapper文件存放的路径
mybatis.mapper-locations=classpath:mapper
@Controller
public class TestController {
@Autowired
private IUserService userService;
@ResponseBody
@RequestMapping("/")
public List test() {
return userService.getAll();
}
}
6.启动项目,在浏览器输入http://localhost:8020/,json数据成功返回,整合mybaties成功
三.整合Redis
1.添加redis-maven依赖
org.springframework.boot
spring-boot-starter-data-redis
org.apache.commons
commons-pool2
2.配置文件中添加redis配置信息
# 缓存服务配置 spring.cache.type=redis spring.redis.host=172.31.201.66 spring.redis.password=ENC(JrHjkKCEYDvsg2KhjPTv6j1LCQSYUAU4) spring.redis.port=6379 spring.redis.database=0 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 redis.server.connectionTimeout=5000 redis.server.soTimeout=5000 redis.server.weight=1 redis.server.blockWhenExhausted=true redis.server.maxWaitMillis=10000 redis.server.minIdle=50 redis.server.testWhileIdle=true redis.server.numTestsPerEvictionRun=30 redis.server.testOnBorrow=true redis.server.maxIdle=100 redis.server.maxTotal=1024
3.创建Redis服务接口以及实现类
public interface IRedisService {
void setValue(String key,Object value);
Object getValue(String key);
void setValue(String key,Object value,Integer exceed,Integer timeType);
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class RedisServiceImpl implements IRedisService {
@Autowired
private RedisTemplate redisTemplate;
@SuppressWarnings("unchecked")
@Override
public void setValue(String key,Object value) {
redisTemplate.opsForValue().set(key, value);
}
@SuppressWarnings("unchecked")
@Override
public Object getValue(String key) {
if(!redisTemplate.hasKey(key)){
return null;
}else{
return redisTemplate.opsForValue().get(key);
}
}
@SuppressWarnings("unchecked")
@Override
public void setValue(String key, Object value, Integer exceed, Integer timeType) {
switch (timeType) {
case 0://天
redisTemplate.opsForValue().set(key, value, exceed, TimeUnit.DAYS);
break;
case 1://小时
redisTemplate.opsForValue().set(key, value, exceed, TimeUnit.HOURS);
break;
case 2: //分钟
redisTemplate.opsForValue().set(key, value, exceed, TimeUnit.MINUTES);
break;
case 3: //秒
redisTemplate.opsForValue().set(key, value, exceed, TimeUnit.SECONDS);
break;
case 4://毫秒
redisTemplate.opsForValue().set(key, value, exceed, TimeUnit.MICROSECONDS);
break;
}
}
}
4.测试Redis
import com.iflytek.bim.cop.domain.entity.User;
import com.iflytek.bim.cop.api.IUserService;
import com.iflytek.bim.cop.service.redis.IRedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
@Controller
public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private IUserService userService;
@Autowired
private IRedisService redisService;
@ResponseBody
@RequestMapping("/")
public List test() {
//存入redis并设置10秒的有效期
redisService.setValue("redisKey", ccwwee, 3, 3);
logger.info("redisKey的值 ==================》 " + redisService.getValue("redisKey"));
try {
Thread.sleep(5000);
logger.info("等待5秒后的值 ==================》 " + redisService.getValue("redisKey"));
} catch (InterruptedException e) {
e.printStackTrace();
}
return userService.getAll();
}
}
5.Redis测试结果
三. 整合PageHelper分页插件
1.在pom.xml文件中引入Pagehelper分页插件
com.github.pagehelper
pagehelper-spring-boot-starter
1.4.1
helperDialect :分页插件会自动检测当前的数据库链接,自动选择合适的分页方式。 你可以配置helperDialect 属性来指定分页插件使用哪种方言。配置时,可以使用下面的缩写值:oracle , mysql , mariadb , sqlite , hsqldb , postgresql , db2 , sqlserver , informix , h2 , sqlserver2012 , derby特别注意:使用 SqlServer2012 数据库时,需要手动指定为 sqlserver2012 ,否则会使用 SqlServer2005 的方式进行分页。你也可以实现 AbstractHelperDialect ,然后配置该属性为实现类的全限定名称即可使用自定义的实现方法。
offsetAsPageNum :默认值为 false ,该参数对使用 RowBounds 作为分页参数时有效。 当该参数设置为true 时,会将 RowBounds 中的 offset 参数当成 pageNum 使用,可以用页码和页面大小两个参数进行分页。
rowBoundsWithCount :默认值为 false ,该参数对使用 RowBounds 作为分页参数时有效。 当该参数设置为 true 时,使用 RowBounds 分页会进行 count 查询。
pageSizeZero :默认值为 false ,当该参数设置为 true 时,如果 pageSize=0 或者 RowBounds.limit =0 就会查询出全部的结果(相当于没有执行分页查询,但是返回结果仍然是 Page 类型)。
reasonable :分页合理化参数,默认值为 false 。当该参数设置为 true 时, pageNum<=0 时会查询第一页, pageNum>pages (超过总数时),会查询最后一页。默认 false 时,直接根据参数进行查询。
params :为了支持 startPage(Object params) 方法,增加了该参数来配置参数映射,用于从对象中根据属性名取值, 可以配置 pageNum,pageSize,count,pageSizeZero,reasonable ,不配置映射的用默认值, 默认值为pageNum=pageNum;pageSize=pageSize;count=countSql;reasonable=reasonable;pageSizeZero=pageSizeZero。
supportMethodsArguments :支持通过 Mapper 接口参数来传递分页参数,默认值 false ,分页插件会从查询方法的参数值中,自动根据上面 params 配置的字段中取值,查找到合适的值时就会自动分页。 使用方法可以参考测试代码中的 com.github.pagehelper.test.basic 包下的 ArgumentsMapTest 和ArgumentsObjTest 。
autoRuntimeDialect :默认值为 false 。设置为 true 时,允许在运行时根据多数据源自动识别对应方言的分页 (不支持自动选择 sqlserver2012 ,只能使用 sqlserver ),用法和注意事项参考下面的场景五。
closeConn :默认值为 true 。当使用运行时动态数据源或没有设置 helperDialect 属性自动获取数据库类型时,会自动获取一个数据库连接, 通过该属性来设置是否关闭获取的这个连接,默认 true 关闭,设置为false 后,不会关闭获取的连接,这个参数的设置要根据自己选择的数据源来决定
2.分页测试类
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.iflytek.bim.cop.domain.entity.User;
import com.iflytek.bim.cop.api.IUserService;
import com.iflytek.bim.cop.service.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.List;
@Controller
public class TestController {
private static final Logger logger = LoggerFactory.getLogger(TestController.class);
@Autowired
private IUserService userService;
@Autowired
private IRedisService redisService;
@ResponseBody
@RequestMapping("/")
public PageInfo test() {
PageHelper.startPage(1, 10);
//存入redis并设置10秒的有效期
redisService.setValue("redisKey", 1, 3, 2);
logger.info("redisKey的值 ==================》 " + redisService.getValue("redisKey"));
try {
Thread.sleep(5000);
logger.info("redisKey 等待5秒后的值 ==================》 " + redisService.getValue("redisKey"));
} catch (InterruptedException e) {
e.printStackTrace();
}
List userList = userService.getAll();
PageInfo pageInfo = new PageInfo<>(userList);
return pageInfo;
}
}
3.分页测试,浏览器访问http://localhost:8020/
注:注意com.github.pagehelper的版本,可能会在启动的时候报错
┌──->──┐ | com.github.pagehelper.autoconfigure.PageHelperAutoConfiguration └──<-──┘
五.集成kafka
1.引入kafka maven依赖
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
1.2.47
commons-lang
commons-lang
2.4
provided
2.服务器安装Kafka
(1).下载kafka压缩包
https://kafka.apache.org/downloads
(2).安装Kafka
tar -zxvf kafka_2.13-3.0.0.tgz
(3).修改kafka的配置
#在公司内网部署 kafka 集群只需要用到 listeners,内外网需要作区分时 才需要用到advertised.listeners listeners=PLAINTEXT://127.0.0.1:9092 advertised.listeners=PLAINTEXT://127.0.0.1:9092 log.dirs=/usr/local/kafka_2.11-0.9.0.1/kafka-logs//
(4).修改kafka内置zookeeper的配置
dataDir=/usr/local/kafka/zookeeper dataLogDir=/usr/local/kafka/zookeeper-logs # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=100 tickTime=2000 initLimit=10 syncLimit=5 # Disable the admi
(5).开启zookeeper和kafka
bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
为了方便可以自己创建一个启动脚本 进入kafka目录下 输入命令:vi kafkaStart.sh 添加内容为: #!/bin/bash #启动zookeeper /DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties & sleep 3 #默默等3秒后执行 #启动kafka /DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties &
启动结果
(6).创建topic主题
./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 -partitions 3 -replication-factor 1
(7).启动生产者连接主题
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
(8).启动消费者连接主题
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
3.springboot开始集成Kafka,添加配置
#kafka spring.applicationname=kafka-tutorial # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=172.31.111.11:9092 spring.kafka.producer.retries: 0 # 每次批量发送消息的数量 spring.kafka.producer.batch-size: 16384 # 缓存容量 spring.kafka.producer.buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer # 指定默认消费者group id spring.kafka.consumer.group-id=demo spring.kafka.consumer.auto-commit-interval=100 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 指定listener容器中的线程数,用于提高并发量 spring.kafka.listener.concurrency=3
4.创建测试bean包,创建kafka生产者与消费者
(1).创建生产者KafkaProduction
import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Component public class KafkaProduction{ private Logger logger = LoggerFactory.getLogger(KafkaProduction.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(T obj,String topics) { String jsonObj = JSON.toJSONString(obj); logger.info("----kafka---- message = {}", jsonObj); //发送消息 ListenableFuture > future = kafkaTemplate.send(topics, jsonObj); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { logger.info("KafkaProduction: kafka to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功消费 //TODO 业务处理 logger.info("KafkaProduction: The message has be sent successfully:"); logger.info("KafkaProduction: =============== result: " + stringObjectSendResult.toString()); } }); } }
(2).创建KafkaTopicsConstant常量类
public interface IKafkaSenderService {
public void send();
}
(3).创建消费者KafkaConsumer
import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class KafkaConsumer {
private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = KafkaTopicsConstant.TEST_TOPICS)
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
logger.info("KafkaConsumer 接收: ================= Topic:" + record.topic());
logger.info("KafkaConsumer 接收: ================= Record:" + record);
logger.info("KafkaConsumer 接收: ================= Message:" + message);
}
}
}
(4).发送实现
import com.iflytek.bim.cop.component.KafkaProduction;
import com.iflytek.bim.cop.contant.KafkaTopicsConstant;
import com.iflytek.bim.cop.domain.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
public class KafkaSenderServiceImpl implements IKafkaSenderService {
@Autowired
private KafkaProduction kafkaProduction;
@Override
public void send() {
User user = new User();
user.setLoginName("I am is a panda");
user.setPassword("5588996633");
kafkaProduction.send(user, KafkaTopicsConstant.TEST_TOPICS);
}
}
(5).测试Kafka
@ResponseBody
@RequestMapping("/")
public void kafkatest() {
kafkaSenderService.send();
}
(6).Kafka测试结果
六.集成SpringCloud


![[全]Springboot分布式框架搭建+Kafka+redis+分页+shiro+Swagger2+Feign+一些坑处理+微服务 [全]Springboot分布式框架搭建+Kafka+redis+分页+shiro+Swagger2+Feign+一些坑处理+微服务](http://www.mshxw.com/aiimages/31/751807.png)
