用于解决kafka消费者端入库无法使用事务注解的情况,异常如下:
//出现问题打印的异常信息
io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread.
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired.intercept(TransactionalInterceptorRequired.java:30)
at io.quarkus.narayana.jta.runtime.interceptor.TransactionalInterceptorRequired_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:41)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at com.zy.push.resource.ConsumerResource_Subclass.updateError(Unknown Source)
at com.zy.push.resource.ConsumerResource.doRecord(ConsumerResource.java:94)
at com.zy.push.resource.ConsumerResource.consume(ConsumerResource.java:59)
at com.zy.push.resource.ConsumerResource_ClientProxy.consume(Unknown Source)
at com.zy.push.resource.ConsumerResource_SmallRyeMessagingInvoker_consume_15a874d2b6c8dbaf775f4720f94a8d81d7386f7c.invoke(Unknown Source)
at io.smallrye.reactive.messaging.providers.AbstractMediator.invoke(AbstractMediator.java:91)
at io.smallrye.reactive.messaging.providers.SubscriberMediator.lambda$processMethodReturningVoid$6(SubscriberMediator.java:171)
at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onSubscribe(MultiFlatMapOp.java:601)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:26)
at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:163)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:193)
at io.smallrye.mutiny.operators.multi.MultiMapOp$MapProcessor.onItem(MultiMapOp.java:50)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:99)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:99)
at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:85)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:230)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
at io.smallrye.mutiny.operators.multi.MultiOnItemInvoke$MultiOnItemInvokeProcessor.onItem(MultiOnItemInvoke.java:41)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:230)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:607)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:93)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.access$100(UniCreateFromKnownItem.java:26)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:74)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onSubscribe(MultiFlatMapOp.java:601)
at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:26)
at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:163)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:193)
at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onItem(MultiOperatorProcessor.java:100)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onNext(UniOnItemTransformToMulti.java:62)
at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:85)
at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription.run(KafkaRecordStreamSubscription.java:198)
at io.smallrye.reactive.messaging.kafka.impl.KafkaRecordStreamSubscription.lambda$dispatch$4(KafkaRecordStreamSubscription.java:178)
at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)
at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
如果是gradle项目,在build.gradle中添加依赖
//原来的阻塞式持久化需要的引入的依赖 implementation 'io.quarkus:quarkus-jdbc-mysql' implementation 'io.quarkus:quarkus-hibernate-orm-panache' //将上述拓展更改为以下依赖 implementation 'io.quarkus:quarkus-hibernate-reactive-panache' implementation 'io.quarkus:quarkus-jdbc-mysql' implementation 'io.quarkus:quarkus-reactive-mysql-client'如果是maven项目,在pom.xml中添加依赖
配置文件application.properiesio.quarkus quarkus-hibernate-orm io.quarkus quarkus-jdbc-mysql io.quarkus quarkus-hibernate-reactive-panache io.quarkus quarkus-jdbc-mysql io.quarkus quarkus-reactive-mysql-client
#原来的阻塞式持久化需要的数据库配置 quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/message_platform?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai quarkus.datasource.jdbc.driver=com.mysql.cj.jdbc.Driver quarkus.datasource.username=root quarkus.datasource.password=123456 quarkus.hibernate-orm.physical-naming-strategy=org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategy #将上述数据库配置更改为以下配置 quarkus.datasource.db-kind=mysql quarkus.datasource.reactive.url=vertx-reactive:mysql://localhost:3306/message_platform?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai quarkus.datasource.username=root quarkus.datasource.password=123456 quarkus.hibernate-orm.physical-naming-strategy=org.hibernate.boot.model.naming.CamelCaseToUnderscoresNamingStrategyXXXRepository
//原来引入的PanacheRepository的包为orm目录下的 import io.quarkus.hibernate.orm.panache.PanacheRepository; //需要修改上述引入的PanacheRepository的包到reactive目录下 import io.quarkus.hibernate.reactive.panache.PanacheRepository;CRUD使用
//需要对返回到对象使用Uni参考文档进行包装 //原有增删改的注解@Transactional需要修改为@ReactiveTransactional //@ReactiveTransactional注解等同于在方法中使用Panache.withTransaction() //更新操作 //以下展示根据sql,将要更新的值进行更新的操作 @ReactiveTransactional public Uni update(long id, String status, Date date) { return repository.update("status = ?1, sendDate = ?2 where id = ?3", status, date, id) .map(entity -> null); } //以下展示根据id查询,如果不为空,将要更新的值进行更新的操作 @ReactiveTransactional public Uni update(long id, String status, Date date) { return repository.findById(id) .onItem().ifNotNull().invoke(entity -> { entity.status = status; entity.sendTime = date; }); } //新增操作 @ReactiveTransactional public Uni persist(MessageDetails entity) { //返回的是对象 return repository.persist(entity); } //删除操作 @ReactiveTransactional public Uni delete(Long id) { return repository.deleteById(id); }
https://quarkus.io/guides/hibernate-reactive-panache



