栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kafka消费者端入库无法使用事务注解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kafka消费者端入库无法使用事务注解

Quarkus使用Reactive Hubernate对消息消费进行非阻塞式持久化 使用该技术的目的

用于解决kafka消费者端入库无法使用事务注解的情况,异常如下:

//出现问题打印的异常信息
io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:30)
    at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source)
    at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
    at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
    at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
    at com.zy.push.resource.ConsumerResource_Subclass.updateError(Unknown Source)
    at com.zy.push.resource.ConsumerResource.doRecord(ConsumerResource.java:94)
    at com.zy.push.resource.ConsumerResource.consume(ConsumerResource.java:59)
    at com.zy.push.resource.ConsumerResource_ClientProxy.consume(Unknown Source)
    at com.zy.push.resource.ConsumerResource_SmallRyeMessagingInvoker_consume_15a874d2b6c8dbaf775f4720f94a8d81d7386f7c.invoke(Unknown Source)
    at io.smallrye.reactive.messaging.providers.AbstractMediator.invoke(AbstractMediator.java:91)
    at io.smallrye.reactive.messaging.providers.SubscriberMediator.lambda$processMethodReturningVoid$6(SubscriberMediator.java:171)
    at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onSubscribe(MultiFlatMapOp.java:601)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:26)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:163)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:193)
    at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:99)
    at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:99)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:85)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:230)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
    at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
    at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:41)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:230)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:93)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.access$100(UniCreateFromKnownItem.java:26)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onSubscribe(MultiFlatMapOp.java:601)
    at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:26)
    at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:163)
    at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:193)
    at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
    at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
    at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onNext(UniOnItemTransformToMulti.java:62)
    at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
    at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:85)
    at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription.run(KafkaRecordStreamSubscription.java:198)
    at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription.lambda$dispatch$4(KafkaRecordStreamSubscription.java:178)
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)
如果是gradle项目,在build.gradle中添加依赖
//原来的阻塞式持久化需要的引入的依赖
implementation 'io.quarkus:quarkus-jdbc-mysql'
implementation 'io.quarkus:quarkus-hibernate-orm-panache'

//将上述拓展更改为以下依赖
implementation 'io.quarkus:quarkus-hibernate-reactive-panache'
implementation 'io.quarkus:quarkus-jdbc-mysql'
implementation 'io.quarkus:quarkus-reactive-mysql-client'
如果是maven项目,在pom.xml中添加依赖


  io.quarkus
  quarkus-hibernate-orm


  io.quarkus
  quarkus-jdbc-mysql




  io.quarkus
  quarkus-hibernate-reactive-panache


  io.quarkus
  quarkus-jdbc-mysql


  io.quarkus
  quarkus-reactive-mysql-client

配置文件application.properies
#原来的阻塞式持久化需要的数据库配置
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/message_platform?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
quarkus.datasource.jdbc.driver=com.mysql.cj.jdbc.Driver
quarkus.datasource.username=root
quarkus.datasource.password=123456
quarkus.hibernate-orm.physical-naming-strategy=org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy

#将上述数据库配置更改为以下配置
quarkus.datasource.db-kind=mysql
quarkus.datasource.reactive.url=vertx-reactive:mysql://localhost:3306/message_platform?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
quarkus.datasource.username=root
quarkus.datasource.password=123456
quarkus.hibernate-orm.physical-naming-strategy=org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy
XXXRepository
//原来引入的PanacheRepository的包为orm目录下的
import io.quarkus.hibernate.orm.panache.PanacheRepository;

//需要修改上述引入的PanacheRepository的包到reactive目录下
import io.quarkus.hibernate.reactive.panache.PanacheRepository;
CRUD使用
//需要对返回到对象使用Uni进行包装

//原有增删改的注解@Transactional需要修改为@ReactiveTransactional
//@ReactiveTransactional注解等同于在方法中使用Panache.withTransaction()

//更新操作
//以下展示根据sql,将要更新的值进行更新的操作
@ReactiveTransactional
public Uni update(long id, String status, Date date) {
    return repository.update("status = ?1, sendDate = ?2 where id = ?3", 
                             status, date, id)
        .map(entity -> null);
}
//以下展示根据id查询,如果不为空,将要更新的值进行更新的操作
@ReactiveTransactional
public Uni update(long id, String status, Date date) {
    return repository.findById(id)
            .onItem().ifNotNull().invoke(entity -> {
                entity.status = status;
                entity.sendTime = date;
            });
}

//新增操作
@ReactiveTransactional
public Uni persist(MessageDetails entity) {
    //返回的是对象
    return repository.persist(entity);
}

//删除操作
@ReactiveTransactional
public Uni delete(Long id) {
    return repository.deleteById(id);
}


参考文档

https://quarkus.io/guides/hibernate-reactive-panache

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/831844.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号