Java事務之八——分布式事務(Spring+JTA+Atomikos+Hibernate+JMS)

 2023-12-07 阅读 24 评论 0

摘要:  在本系列先前的文章中,我們主要講解了JDBC對本地事務的處理,本篇文章將講到一個分布式事務的例子。 ?   請通過以下方式下載github源代碼: git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git ?   本地事務和分布式事務

  在本系列先前的文章中,我們主要講解了JDBC對本地事務的處理,本篇文章將講到一個分布式事務的例子。

?

  請通過以下方式下載github源代碼:

git clone https://github.com/davenkin/jta-atomikos-hibernate-activemq.git

?

  本地事務和分布式事務的區別在于:本地事務只用于處理單一數據源事務(比如單個數據庫),分布式事務可以處理多種異構的數據源,比如某個業務操作中同時包含了JDBC和JMS或者某個操作需要訪問多個不同的數據庫。

?

  Java通過JTA完成分布式事務,JTA本身只是一種規范,不同的應用服務器都包含有自己的實現(比如JbossJTA),同時還存在獨立于應用服務器的單獨JTA實現,比如本篇中要講到的Atomikos。對于JTA的原理,這里不細講,讀者可以通過這篇文章了解相關知識。

?

  在本篇文章中,我們將實現以下一個應用場景:你在網上購物,下了訂單之后,訂單數據將保存在系統的數據庫中,同時為了安排物流,訂單信息將以消息(Message)的方式發送到物流部門以便送貨。

?

  以上操作同時設計到數據庫操作和JMS消息發送,為了使整個操作成為一個原子操作,我們只能選擇分布式事務。我們首先設計一個service層,定義OrderService接口:

package davenkin;public interface OrderService {public void makeOrder(Order order);
}

?

  為了簡單起見,我們設計一個非常簡單的領域對象Order:

@XmlRootElement(name = "Order")
@XmlAccessorType(XmlAccessType.FIELD)
public class Order {@XmlElement(name = "Id",required = true)private long id;@XmlElement(name = "ItemName",required = true)private String itemName;@XmlElement(name = "Price",required = true)private double price;@XmlElement(name = "BuyerName",required = true)private String buyerName;@XmlElement(name = "MailAddress",required = true)private String mailAddress;public Order() {}

?

  為了采用JAXB對Order對象進行Marshal和Unmarshal,我們在Order類中加入了JAXB相關的Annotation。 我們將使用Hibernate來完成數據持久化,然后使用Spring提供的JmsTemplate將Order轉成xml后以TextMessage的形式發送到物流部門的ORDER.QUEUE中。

?

(一)準備數據庫

  為了方便,我們將采用Spring提供的embedded數據庫,默認情況下Spring采用HSQL作為后臺數據庫,雖然在本例中我們將采用HSQL的非XA的DataSource,但是通過Atomikos包裝之后依然可以參與分布式事務。

  SQL腳本包含在createDB.sql文件中:

CREATE TABLE USER_ORDER(
ID INT NOT NULL,
ITEM_NAME VARCHAR (100) NOT NULL UNIQUE,
PRICE DOUBLE NOT NULL,
BUYER_NAME CHAR (32) NOT NULL,
MAIL_ADDRESS VARCHAR(500) NOT NULL,
PRIMARY KEY(ID)
);

?

  在Spring中配置DataSource如下:

    <jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:createDB.sql"/></jdbc:embedded-database>

?

(二)啟動ActiveMQ

  我們將采用embedded的ActiveMQ,在測試之前啟動ActiveMQ提供的BrokerService,在測試執行完之后關閉BrokerService。

  @BeforeClasspublic static void startEmbeddedActiveMq() throws Exception {broker = new BrokerService();broker.addConnector("tcp://localhost:61616");broker.start();}@AfterClasspublic static void stopEmbeddedActiveMq() throws Exception {broker.stop();}

?

(三)實現OrderService

  創建一個DefaultOrderService,該類實現了OrderService接口,并維護一個JmsTemplate和一個Hibernate的SessionFactory實例變量,分別用于Message的發送和數據庫處理。

package davenkin;import org.hibernate.SessionFactory;
import org.hibernate.classic.Session;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.annotation.Transactional;public class DefaultOrderService  implements OrderService{private JmsTemplate jmsTemplate;private SessionFactory sessionFactory;@Override@Transactionalpublic void makeOrder(Order order) {Session session = sessionFactory.getCurrentSession();session.save(order);jmsTemplate.convertAndSend(order);}@Requiredpublic void setJmsTemplate(JmsTemplate jmsTemplate) {this.jmsTemplate = jmsTemplate;}@Requiredpublic void setSessionFactory(SessionFactory sessionFactory) {this.sessionFactory = sessionFactory;}
}

?

(四)創建Order的Mapping配置文件

<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN""http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd"><hibernate-mapping><class name="davenkin.Order" table="USER_ORDER"><id name="id" type="long"><column name="ID" /><generator class="increment" /></id><property name="itemName" type="string"><column name="ITEM_NAME" /></property><property name="price" type="double"><column name="PRICE"/></property><property name="buyerName" type="string"><column name="BUYER_NAME"/></property><property name="mailAddress" type="string"><column name="MAIL_ADDRESS"/></property></class>
</hibernate-mapping>

?

(五)配置Atomikos事務

  在Spring的IoC容器中,我們需要配置由Atomikos提供的UserTransaction和TransactionManager,然后再配置Spring的JtaTransactionManager:

  <bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" init-method="init" destroy-method="shutdownForce"><constructor-arg><props><prop key="com.atomikos.icatch.service">com.atomikos.icatch.standalone.UserTransactionServiceFactory</prop></props></constructor-arg></bean><bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" depends-on="userTransactionService"><property name="forceShutdown" value="false" /></bean><bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" depends-on="userTransactionService"><property name="transactionTimeout" value="300" /></bean><bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="userTransactionService"><property name="transactionManager" ref="atomikosTransactionManager" /><property name="userTransaction" ref="atomikosUserTransaction" /></bean><tx:annotation-driven transaction-manager="jtaTransactionManager" />

?

(六)配置JMS

  對于JMS,為了能使ActiveMQ加入到分布式事務中,我們需要配置ActiveMQXAConnectionFactory,而不是ActiveMQConnectionFactory,然后再配置JmsTemplate,此外還需要配置MessageConvertor在Order對象和XML之間互轉。

    <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616" /></bean><bean id="amqConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init"><property name="uniqueResourceName" value="XAactiveMQ" /><property name="xaConnectionFactory" ref="jmsXaConnectionFactory" /><property name="poolSize" value="5"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="amqConnectionFactory"/><property name="receiveTimeout" value="2000" /><property name="defaultDestination" ref="orderQueue"/><property name="sessionTransacted" value="true" /><property name="messageConverter" ref="oxmMessageConverter"/></bean><bean id="orderQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="ORDER.QUEUE"/></bean><bean id="oxmMessageConverter"class="org.springframework.jms.support.converter.MarshallingMessageConverter"><property name="marshaller" ref="marshaller"/><property name="unmarshaller" ref="marshaller"/></bean><oxm:jaxb2-marshaller id="marshaller"><oxm:class-to-be-bound name="davenkin.Order"/></oxm:jaxb2-marshaller>

?

(七)測試

  在測試中,我們首先通過(二)中的方法啟動ActiveMQ,再調用DefaultOrderService,最后對數據庫和QUEUE進行驗證:

   @Testpublic void makeOrder(){orderService.makeOrder(createOrder());JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);assertEquals(1, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));String dbItemName = jdbcTemplate.queryForObject("SELECT ITEM_NAME FROM USER_ORDER", String.class);String messageItemName = ((Order) jmsTemplate.receiveAndConvert()).getItemName();assertEquals(dbItemName, messageItemName);}@Test(expected = IllegalArgumentException.class)public void failToMakeOrder(){orderService.makeOrder(null);JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);assertEquals(0, jdbcTemplate.queryForInt("SELECT COUNT(*) FROM USER_ORDER"));assertNull(jmsTemplate.receiveAndConvert());}

?

轉載于:https://www.cnblogs.com/davenkin/archive/2013/03/19/java-tranaction-8.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/5/193020.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息