[Java] JMS(Java Message Service) 란?
이 문서는 개인적인 목적이나 배포하기 위해서 복사할 수 있다. 출력물이든 디지털 문서든 각 복사본에 어떤 비용도 청구할 수 없고 모든 복사본에는 이 카피라이트 문구가 있어야 한다.
[Spring 레퍼런스] 22장 JMS (Java Message Service)
22. JMS (Java Message Service)
22.1 소개
스프링은 JDBC API를 통합했듯이 JMS API도 사용하기 쉽게 JMS 통합 프레임워크를 제공한다.
JMS는 기능적으로 대략 두 부분으로 나눌 수 있는데 메시지의 생산(production)과 소비(consumption)이다. JmsTemplate 클래스는 메시지 생산과 동기적인 메시지 수신에 사용한다. Java EE의 메시지주도 빈(bean) 방식과 유사한 비동기적인 수신에 대해서 스프링은 메시지주도 POJO(MDP, Message-Driven POJOs)를 생성하는데 사용하는 다수의 메시지 리스너 컨테이너를 제공한다.
org.springframework.jms.core 패키지는 JMS 사용에 핵심적인 기능을 제공한다. 이 패키지에 JdbcTemplate가 JDBC에 하듯이 리소스의 생성과 소멸을 다뤄줌으로써 JMS를 간편하게 사용하게 하는 JMS 템플릿 클래스가 있다. 스프링 템플릿 클래스와 공통되는 설계 원리는 더 세련된 사용방법을 위해서 공통되는 작업을 수행하는 헬퍼 메서드를 제공하고 작업을 처리하는 부분을 콜백 인터페이스를 구현한 사용자에게 맡기는 것이다. 이 클래스는 동기적으로 메시지를 보내고 메시지를 소비하고 JMS 세션와 메시지 생산자(producer)를 사용자에게 노출하는 다양한 편의 메서드를 제공한다.
org.springframework.jms.support 패키지는 JMSException 변환 기능을 제공한다. 이 변환기능은 체크드 JMSException 계층을 동일한 계층의 언체크드 예외로 변환한다. 프로바이더에 특화된 체크드 javax.jms.JMSException의 하위클래스가 있다면 이 예외는 언체크드 UncategorizedJmsException로 감싸진다.
org.springframework.jms.support.converter 패키지는 Java 객체와 JMS 메시지간에 변환을 하는 MessageConverter 추상화를 제공한다.
org.springframework.jms.support.destination 패키지는 JNDI에 저장된 목적지에 대한 서비스 로케이터를 제공하듯이 JMS 목적지를 관리하는 다양한 전략을 제공한다.
마지막으로 org.springframework.jms.connection 패키지는 독립 어플리케이션에서 사용하는데 적합한 ConnectionFactory의 구현체를 제공한다. 이 패키지에는 JMS를 위한 스프링 PlatformTransactionManager의 구현체도 있다.(이 이름은 JmsTransactionManager이다) 이는 JMS를 트랜잭션 리소스처럼 스프링의 트랜잭션 관리 메카니즘으로 어려움없이 통합할 수 있게 한다.
22.2 스프링 JMS 사용
22.2.1 JmsTemplate
JmsTemplate 클래스는 JMS 코어 패키지의 핵심 클래스다. JmsTemplate 클래스는 메시지를 보내거나 동기적으로 메시지를 받을 때 리소르를 생성하고 제거하는 처리를 해주므로 JMS의 사용을 간단하게 해준다.
JmsTemplate을 사용하는 코드는 고수준의 계약을 명확하게 정의해주는 콜백 인터페이스만 구현하면 된다. MessageCreator 콜백 인터페이스는 JmsTemplate에서 코드를 호출해서 제공한 Session의 메시지를 생성한다. JMS API의 더 복잡한 사용을 위해서 SessionCallback 콜백은 JMS 세션을 가진 사용자와 Session과 MessageProducer 쌍을 노출하는 ProducerCallback 콜백을 노출한다.
JMS API는 두 종류의 send 메서드를 노출하는데 하나는 전송모드, 중요도, 서비스품질 (QOS, Quality of Service)로 time-to-live를 받고 하나는 QOS 파라미터를 받지 않고 기본값을 사용한다. JmsTemplate에 다수의 send 메서드가 있으므로 QOS 파라미터를 설정하는 것은 다수의 send 메서드의 중복을 피하기 위해 빈 프로퍼티로 노출된다. 비슷하게 동기적인 receive 호출의 타임아웃값은 setReceiveTimeout 프로퍼티로 설정한다.
일부 JMS 프로바이더는 관리상(administratively) ConnectionFactory의 설정으로 기본 QOS 값을 설정할 수 있게 한다. MessageProducer의 send 메서드 send(Destination destination, Message message)를 호출해서 JMS 명세서에 나온 것과는 다른 QOS 기본값을 사용하게 할 것이다. 일관된 QOS 값의 관리를 위해서 JmsTemplate은 특히 isExplicitQosEnabled 프로퍼티를 true로 설정해서 자신만의 QOS 값을 사용하도록 되어야 한다.
Note
JmsTemplate 클래스의 인스턴스는 설정되고 나면 스레드 세이프하다. 이는 하나의 JmsTemplate 인스턴스를 설정해서 공유되는 참조로 여러 협력객체에 안전하게 주입할 수 있다는 의미이므로 중요하다. 좀 더 분명하게 JmsTemplate은 상태를 가지지만(ConnectionFactory에 대한 참조를 유지한다는 점에서) 이 상태는 대화식 상태는 아니다.
22.2.2 연결
JmsTemplate는 ConnectionFactory에 대한 참조를 필요로 한다.
ConnectionFactory는 JMS 명세에 포함되어 있고 JMS와 동작하는 진입점으로 제공된다. 클라이언트 어플리케이션이 ConnectionFactory를 JMS 프로바이더의 연결을 생성하는 팩토리로 사용하고 다양한 설정 파라미터(대부분은 SSL 설정옵션 같은 벤더에 특화된 설정이다.)를 캡슐화한다.
EJB에서 JMS를 사용하는 경우 벤더가 JMS 인터페이스의 구현체를 제공하므로 구현체가 선언적인 트랜잭션 관리에 참여하고 연결과 세션의 풀링을 수행한다. 이 구현체를 사용하려면 Java EE 컨테이너는 보통 EJB내의 resource-ref로 JMS 커넥션 팩토리나 서블릭 배포 디스크립터를 선언하도록 요구한다. EJB에서 이 기능과 JmsTemplate를 함께 사용하려면 클라이언트 어플리케이션이 ConnectionFactory의 관리된 구현체를 참조한다는 것을 보장해야 한다.
22.2.2.1 메시징 리소스 캐싱
표준 API에는 다수의 중간객체의 생성하는 API가 포함되어 있다. 메시지를 보내려면 다음의 ‘API’가 실행된다.
1
ConnectionFactory->Connection->Session->MessageProducer->send
ConnectionFactory와 Send 작업사이에는 생성되고 소멸되는 세개의 중간 객체가 있다. 리소스의 사용을 최적화하고 성능을 높히기 위해 IConnectionFactory의 두 구현체를 제공한다.
22.2.2.2 SingleConnectionFactory
스프링은 ConnectionFactory 인터페이스의 구현체로 SingleConnectionFactory를 제공하고 SingleConnectionFactory는 모든 createConnection() 호출에서 같은 Connection을 반환하고 close() 호출은 무시할 것이다. 이는 다수의 트랜잭션이 걸친 여러 JmsTemplate 호출에 같은 연결을 사용할 수 있으므로 테스트와 독립적인 환경에 유용하다.
SingleConnectionFactory는 일반적으로 JNDI에서 받은 표준 ConnectionFactory에 대한 참조를 받는다.
22.2.2.3 CachingConnectionFactory
CachingConnectionFactory는 SingleConnectionFactory의 기능을 확장하고 Session, MessageProducer, MessageConsumer에 캐싱을 추가한다. 초기 캐시크기는 1이고 캐시되는 세션의 수를 증가시키려면 SessionCacheSize 프로퍼티를 사용해라. 실제 캐시된 세션의 수는 승인 모드(acknowledgment mode)에 기반해서 캐시된 세션의 수보다 많을 것이므로 SessionCacheSize를 1로 설정한 경우(각 AcknowledgementMode마다 1) 캐시된 세션 인스턴스는 4까지 될 수 있다. MessageProducer와 MessageConsumer는 이들이 소유한 세션에 캐시되고 캐싱할 때 생산자(producer)와 소비자(consumer)의 유일한 프로퍼티를 고려한다. MessageProducer는 목적지에 기반해서 캐시된다. MessageConsumer는 목적지, 셀렉터, noLocal deliver 플래그, (durable consumer를 생성한다면)장기 구독 이름(durable subscription name)로 구성된 키에 기반한 캐시된다.
22.2.3 목적지 관리
ConnectionFactory같은 목적지는 JNDI로 저장하고 획득할 수 있는 JMS가 관리하는 객체다. 스프링 어플리케이션 컨텍스트를 설정할 때 JMS 목적지에 대한 객체의 참조에 의존성 주입을 하기 위해 JNDI 팰토리 클래스 JndiObjectFactoryBean / 를 사용할 수 있다. 하지만 어플리케이션에 아주 많은 목적지가 있거나 JMS 프로바이더에 특화되 고급 목적지 관리기능이 있다면 이 전력을 다루기 어려울 수 있다. 고급 목적지 관리에는 동적 목적지 생성이나 목적지의 계층적 네임스페이스 지원등이 있다. JmsTemplate은 목적지 처리를 JMS 목적지 객체나 DestinationResolver 인터페이스의 구현체에 위임한다. JmsTemplate이 기본 구현체로 DynamicDestinationResolver를 사용하고 동적 목적지를 처리한다. 제공된 JndiDestinationResolver는 JNDI에 있는 목적지의 서비스 로케이터처럼 동작하고 선택적으로 DynamicDestinationResolver의 동작으로 장애 복구를 한다.
드물기는 하지만 JMS 어플리케이션이 상요하는 목적지를 런타임에서만 알 수 있는 경우에는 어플리케이션을 배포할 때 생성할 수 없다. 이러한 상황은 상호장용하는 시스템 컴포넌트가 잘 알려진 작명관례에 따라 런타임에서 목적지를 생성하고 이 시스템 컴포넌트간에 어플리케이션 로직을 공유하기 때문에 발생한다. 동적 목적지생성이 JMS 명세에 나와있지 않지만 대부분의 벤더는 이 기능을 제공하고 있다. 동적 목적 목적지는 임시 목적지와 구별할 수 있는 사용자가 정의한 이름으로 생성되고 JNDI에 등록되지 않기도 한다. 목적지와 관련된 프로퍼티가 벤더에 특화되어 있으므로 동적 목적지를 생성하는 API는 프로바이더마다 다양하다. 하지만 벤더가 만든 간단한 구현체는 JMS 명세의 경고를 무시하고 TopicSession 메서드 createTopic(String topicName)나 QueueSession 메서드 createQueue(String queueName)를 기본 목적지 프로퍼티로 새로운 목적지를 생성하는데 사용한다. 벤더 구현체에 따라서는 DynamicDestinationResolver가 목적지를 처리하는 것이 아닌 물리적인 목적지를 생성하기도 한다.
불리언 프로퍼티인 pubSubDomain를 JmsTemplate이 어떤 JMS 도메인을 사용하는지 설정하는데 사용한다. 이 프로퍼티의 기본값은 false로 point-to-point 도메인인 Queue를 사용할 것임을 나타낸다. JmsTemplate이 사용하는 이 프로퍼티는 DestinationResolver 인터페이스의 구현체로 동적 목적지 처리의 동작을 결정한다.
defaultDestination 프로퍼티를 사용해서 기본 목적지로 JmsTemplate를 설정할 수도 있다. 기본 목적지는 목적지를 명시하지 않은 보내기/받기 작업에 사용할 것인다.
22.2.4 메시지 리스너 컨테이너
EJB에서 JMS 메시지의 가장 일반적인 경우는 메시지주도 빈(MDB, message-driven beans)를 사용하는 것이다. 스프링은 사용자가 EJB 컨테이너에 의존하지 않는 방법으로 메시지 주도 POJO(MDP, message-driven POJO)를 생성하는 방법을 제공한다. (스프링의 MDP 지원의 자세한 내용은 Section 22.4.2, “비동기 수신 - 메시지 주도 POJO”를 참고해라.)
메시지 리스너 컨테이너는 JMS 메시지 큐에서 메시지를 받고 큐에 주입할 MessageListener를 생성하는데 사용한다. 리스너 컨테이너는 메시지 접수와 처리할 리스너로 디스패치하는 모든 스레드를 책임진다. 메시지 리스너 컨테이너는 MDP와 메시지 프로바이더 사이의 중개자의 역할을 하고 받은 메시지의 등록, 트랙잭셤 참여, 리소스 획득과 릴리즈, 예외 변환등을 담당한다. 이를 통해서 어플리케이션 개발자는 메시지를 받는(그리고 응답을 해야할 수도 있다) 비즈니스 로직(복잡할 수도 있다.)을 작석할 수 있고 JMS 인프라스트럭처의 보일러플레이트에 대한 걱정을 프레임워크에 위임할 수 있다.
스프링에는 두개의 표준 JMS 메시지 리스너 컨테이너가 있고 각 컨테이너는 전문화된 기능을 가진다.
22.2.4.1 SimpleMessageListenerContainer
이 메시지 리스너 컨테이너는 두 표준보다 간단하다. 구동할 때 고정된 수의 JSM 세션과 컨슈머를 생성하고 표준 JMS MessageConsumer.setMessageListener() 메서드를 사용해서 리스너를 등록하고 JMS 프로바이더나 리스너 콜백을 실행하도록 한다. 이 컨테이너는 동적으로 런타임 요구사항에 맞출 수 없고 외부에서 관리하는 트랜잭션에 참여할 수는 없다. 호환성을 위해서 독립적인 JMS 명세서에 아주 가깝지만 보통 Java EE의 JMS 제약사항에는 호환되지 않는다.
22.2.4.2 DefaultMessageListenerContainer
이 메시지 리스너 컨테이너는 가장 많이 사용하는 것 중 하나이다. SimpleMessageListenerContainer와는 달리 런타임 요구사항을 동적으로 맞출 수 있고 외부에서 관리하는 트랜잭션에 참여할 수 있다. JtaTransactionManager를 설정한 경우 받는 각각의 메시지를 XA 트랜잭션에 등록하므로 처리할 때 XA 트랜잭션의 장점을 가질 수 있다. 이 리스너 컨테이너는 JMS 프로바이저의 낮은 요구사항과 트랜잭션 참여같은 고급 기능과 Java EE 환경에 호환성 간의 균형을 잘 맞추고 있다.
22.2.5 Transaction management
스프링은 단일 JMS ConnectionFactory의 트랜잭션을 관리하는 JmsTransactionManager를 제공한다. JmsTransactionManager는 Chapter 11, 트랜잭션 관리에서 설명한 스프링이 관리하는 트랜잭션 기능을 JMS 어플리케이션이 사용하도록 한다. JmsTransactionManager는 로컬 리소스 트랜잭션을 수행하고 JMS 연결/세션 쌍을 지정한 ConnectionFactory에서 스레드로 바인딩한다. JmsTemplate는 자동으로 이러한 트랜잭션이 가능한 리소스를 탐지해서 적절하게 리소스상에서 작업을 실행한다.
Java EE 환경에서 ConnectionFactory는 연결과 세션의 풀을 사용하므로 이러한 리소스를 트랜잭션사이에서 효율적으로 재사용한다. 독립적인 환경에서 스프링의 SingleConnectionFactory를 사용하면 공유된 JMS Connection를 사용하게 되고 각 트랜잭션은 자신만의 독립적인 Session을 갖게 된다. 아니면 ActiveMQ의 PooledConnectionFactory 클래스처럼 프로바이더에 특화된 풀링(pooling) 어댑터를 사용하는 것을 고려해 봐라.
JmsTemplate를 JtaTransactionManager나 분산 트랜잭션을 위해서 XA 기능을 가진 JMS ConnectionFactory와 함께 사용할 수도 있다. 이 경우 XA로 잘 설정한 ConnectionFactory같은 JTA 트랜잭션 관리자를 사용해야 한다! (사용하는 Java EE 서버나 JMS 프로바이더의 문서를 참고해라.)
Connection에서 Session을 생성하려고 JMS API를 사용하는 경우 관리하는 트랜잭션 환경과 관리하지 않는 트랜잭션 환경사이에서 코드를 재사용하는 것이 혼란스러울 수 있다. 이는 JMS API가 Session을 생성하는 팩토리 메서드를 딱 하나만 가지고 있고 트랜잭션과 승낙(acknowledgement) 모드에 대한 값을 필요로 하기 때문이다. 관리되는 환경에서는 해당 환경의 트랜잭션 인프라가 이러한 값을 설정하므로 JMS 연결을 감싸는 벤더의 랩퍼는 이러한 값을 무시한다. 관리되지 않는 환경에서 JmsTemplate을 사용하는 경우 sessionTransacted와 sessionAcknowledgeMode 프로퍼티를 사용해서 이러한 값을 지정할 수 있다. PlatformTransactionManager와 JmsTemplate를 같이 사용하는 경우 템플릿은 항상 트랜잭션이 가능한 JMS Session을 가질 것이다.
22.3 Message 전송
JmsTemplate에는 메시지 전송과 관련해서 편리한 메서드가 많이 있다. javax.jms.Destination 객체로 목적지를 지정하는 전송메서드와 JNDI 검색에 사용하는 문자열로 목적지를 지정하는 전송메서드가 있다. 목적지 인자를 받지 않는 전송 메서드는 기본 목적지를 사용한다. 1.0.2 구현체를 사용해서 큐에 메시지를 보내는 예제가 다음에 나와 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;
public class JmsQueueSender {
private JmsTemplate jmsTemplate;
private Queue queue;
public void setConnectionFactory(ConnectionFactory cf) {
this.jmsTemplate = new JmsTemplate(cf);
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public void simpleSend() {
this.jmsTemplate.send(this.queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello queue world");
}
});
}
}
이 예제는 제공된 Session 객체에서 텍스트 메시지를 생성하기 위해 MessageCreator 콜백을 사용한다. ConnectionFactory에 대한 참조를 전달해서 JmsTemplate을 생성한다. 아니면 인자가 없는 생성자와 connectionFactory를 제공하면 JavaBean 형식의 인스턴스를 생성하는데 사용할 수 있다.(BeanFactory나 일반적인 자바코드를 사용해서) 또는 스프링의 JmsGatewaySupport 기잔 클래스에서 생성하는 것을 생각해 볼 수 있는데 JmsGatewaySupport는 JMS 설정에 대한 빈 프로퍼티를 미리 만들어서 제공한다.
send(String destinationName, MessageCreator creator) 메서드로 목적지의 문자열 이름을 사용해서 메시지를 전송할 수 있다. 이 목적지 이름이 JNDI에 등록되어 있는 경우 JndiDestinationResolver 인스턴스에 템플릿의 destinationResolver 프로퍼티를 설정해야 한다.
JmsTemplate를 생성하고 기본 목적지를 지정했다면 send(MessageCreator c)는 이 목적지로 메시지를 전송한다.
22.3.1 메시지 컨버터의 사용
도메인 모델 객체를 전송하기 쉽도록 JmsTemplate에는 메시지의 데이터 내용에 대한 자바 객체를 인자로 받는 다양한 전송 메서드가 있다. JmsTemplate에서 오버로드된 convertAndSend()와 receiveAndConvert()는 변환 과정을 MessageConverter 인터페이스의 인스턴스로 위임한다. MessageConverter 인터페이스는 자바 객체와 JMS 메시지간의 변환에 대한 간단한 조건을 정의하고 있다. 기본 구현체 SimpleMessageConverter는 String과 TextMessage, byte[]와 BytesMesssage, java.util.Map와 MapMessage간의 변환을 지원한다.
컨버터를 사용함으로써 JMS로 보내고 받는 비즈니스 객체에만 집중할 수 있고 JMS 메시지로 어떻게 표현되는지에 대한 자세한 내용은 신경쓰지 않아도 된다.
최근의 샌드박스에는 자바빈과 MapMessage를 변환하는데 리플렉션을 사용하는 MapMessageConverter가 포함되어 있다. 직접 구현혈만한 다른 구현체 중에서 인기있는 것은 객체를 나타내는 TextMessage를 생성하는데 JAXB, Castor, XMLBeans, XStream같은 이미 존재하는 XML 마샬링 패키지를 사용하는 컨버터이다.
일반적으로 컨버터 클래스내에 캡슐화될 수 없는 메시지의 프로퍼티, 헤더, 바디를 설정하기 위해서 MessagePostProcessor 인터페이스는 메시지가 변환되고 나서 전송되기 전에 접근할 수 있게 해준다. 아래 예제는 java.util.Map가 메시지로 변환된 후에 메시지 헤더와 프로퍼티를 어떻게 수정하는지 보여준다.
1
2
3
4
5
6
7
8
9
10
11
12
public void sendWithConversion() {
Map map = new HashMap();
map.put("Name", "Mark");
map.put("Age", new Integer(47));
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws JMSException {
message.setIntProperty("AccountID", 1234);
message.setJMSCorrelationID("123-00001");
return message;
}
});
}
이는 메시지 형식이 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
MapMessage={
Header={
... standard headers ...
CorrelationID={123-00001}
}
Properties={
AccountID={Integer:1234}
}
Fields={
Name={String:Mark}
Age={Integer:47}
}
}
22.3.2 SessionCallback과 ProducerCallback
전송 작업이 다수의 일반적인 사용사례를 다룰 수 있지만 JMS Session이나 MessageProducer에서 다수의 작업을 수행하고자 하는 경우가 있다.
SessionCallback와 ProducerCallback는 JMS Session과 Session / MessageProducer 쌍을 각각 노축한다. JmsTemplate의 execute() 메서드는 이러한 콜백 메서드를 실행한다.
22.4 메시지 수신
22.4.1 동기 수신
JMS가 보통 비동기 처리와 관련되어 있지만 동기적으로 메시지를 처리할 수도 있다. receive(..) 메서드를 오버로드해서 동기적으로 처리할 수 있다.
동기적으로 메시지를 받는 동안 호출하는 스레드는 메시지를 이용할 수 있을 때까지 블럭킹된다. 호출하는 스레드가 불명확하게 블럭킹될 가능성이 있으므로 이는 위험한 작업이 될 수 있다. receiveTimeout 프로퍼티는 리시버가 메시지를 얼마나 오랫동안 기다려야 하는지를 지정한다.
22.4.2 비동기 수신 - 메시지 주도 POJO
EJB의 메시지 주도 빈(MDB, Message-Driven Bean)과 비슷한 방법으로 메시지 주도 POJO(MDP, Message-Driven POJO)는 JMS 메시지에 대해서 리시버로 동작한다. javax.jms.MessageListener 인터페이스를 반드시 구현해야 한다는 점이 MDP가 가진 하나의 제약사항(하지만 아래의 MessageListenerAdapter 클래스에 대한 논의를 참고해라.)이다. POJO가 여러 스레드에서 메시지를 받는 경우라면 스레드세이프하게 구현해야 한다는 점이 중요하다는 것을 기억해야 한다.
아래는 MDP 구현체의 간단한 예시이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
MessageListener를 구현했으면 메시지 리스너 컨테이너를 만들 차례다.
스프링이 제공하는 메시지 리스너 컨테이너중 하나를 어떻게 정의하고 설정했는지 아래 예제를 참고해라.(이 경우에는 DefaultMessageListenerContainer)
1
2
3
4
5
6
7
8
<bean id="messageListener" class="jmsexample.ExampleListener" />
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
</bean>
다양한 메시지 리스너 컨테이너의 각 구현체가 지원하는 기능과 자세한 내용은 스프링 자바독을 참고해라.
22.4.3 SessionAwareMessageListener 인터페이스
SessionAwareMessageListener 인터페이스는 스프링에 특화된 인터페이스로 JMS MessageListener와 비슷한 계약(contract)을 제공하지만 Message를 받은 JMS Session에 접근해서 메시지를 처리할 수 있는 메서드도 제공한다.
1
2
3
4
5
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
구현한 MDP가 어떤 메시지에도 응답할 수 있기를 (onMessage(Message, Session) 메서드에서 제공된 Session을 사용해서) 원한다면 MDP가 이 인터페이스 (표준 JMS MessageListener 인터페이스를 선호하는 경우)를 구현하도록 할 수 있다. 스프링이 제공하는 모든 메시지 리스너 컨테이너 구현체는 MessageListener나 SessionAwareMessageListener 인터페이스를 구현한 MDP를 지원한다. SessionAwareMessageListener를 구현한 클래스는 이 인터페이스로 스프링에 의존성을 가진다는 것을 유의해야 한다. 이를 사용할 것인지 아닌지는 어플리케이션 개발자나 아키텍쳐의 선택이다.
SessionAwareMessageListener 인터페이스의 ‘onMessage(..)’ 메서드가 JMSException를 던진다는 것을 기억해라. 표준 JMS MessageListener 인터페이스와는 달리 SessionAwareMessageListener 인터페이스를 사용했을 때 던저진 예외를 처리하는 것은 클라이언트 코드의 몫이다.
22.4.4 MessageListenerAdapter
MessageListenerAdapter 클래스는 스프링에서 비동기 메시지를 지원하는 마지막 컴포넌트다. 간단히 말하면 이 클래스는 거의 모든 클래스를 MDP로 노출하도록 해준다. (물론 몇가지 제약사항이 있다.)
다음의 인터페이스 정의를 보자. 이 인터페이스가 MessageListener나 SessionAwareMessageListener를 상속하지 않았음에도 MessageListenerAdapter를 사용해서 MDP로 사용할 수 있다. 다양한 메시지 처리 메서드가 어떻게 받아서 처리할 수 있는 다양한 Message의 내용에 따라 타입을 지정하는지도 참고해라.
1
2
3
4
5
6
7
8
9
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
1
2
3
public class DefaultMessageDelegate implements MessageDelegate {
// 구현부는 생략했다...
}
특히 MessageDelegate 인터페이스의 구현체 (위의 DefaultMessageDelegate 클래스)가 어떻게 JSM에 대한 의존성은 전혀 갖지 않는지 봐라. 이는 다음의 설정으로 MDP로 만들 POJO다.
1
2
3
4
5
6
7
8
9
10
11
12
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
</bean>
다음은 또다른 MDP의 예시로 JMS TextMessage 메시지만 받아서 처리할 수 있다. 어떻게 메시지 처리 메서드가 실제로는 ‘receive’로 (MessageListenerAdapter의 메시지 처리 메서드의 이름은 기본적으로 ‘handleMessage’이다.) 호출되게 설정하는지 참고해라. ‘receive(..)’ 메서드가 JMS TextMessage 메시지에만 받고 응답하도록 타입을 지정하는 방법도 참고해라.
1
2
3
public interface TextMessageDelegate {
void receive(TextMessage message);
}
1
2
3
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// 구현부는 생략했다...
}
MessageListenerAdapter의 설정은 다음과 같을 것이다.
1
2
3
4
5
6
7
8
9
10
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- 자동 메시지 컨텍스트 추출을 원하지 않는다 -->
<property name="messageConverter">
<null/>
</property>
</bean>
위의 ‘messageListener’가 TextMessage가 아닌 다른 타입의 JMS Message를 받는다면 IllegalStateException가 던져질 것이다.(그리고 이어진 메시지는 무시할 것이다.) 핸들러 메서드가 void가 아닌 값을 반환하는 경우 자동적으로 응답 Message를 다시 보내는 것은 MessageListenerAdapter클래스의 또다른 기능이다. 다음 인터페이스와 클래스를 보자.
1
2
3
4
public interface ResponsiveTextMessageDelegate {
// 반환 타입을 잘 봐라...
String receive(TextMessage message);
}
1
2
3
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// 구현부는 생략했다...
}
위의 DefaultResponsiveTextMessageDelegate를 MessageListenerAdapter와 함께 사용한 경우 ‘receive(..)’ 메서드의 실행후 반환된 null이 아닌 모든 값은 TextMessage로 변환될 것이다.(기본 설정에 따라) 그 다음 변환된 TextMessage는 원래의 Message의 JMS Reply-To 프로퍼티에 정의된(존재하는 경우) Destination이나 MessageListenerAdapter에 설정된 기본 Destination(설정되었다면)으로 보내진다. Destination가 없다면 InvalidDestinationException가 던져질 것이다.(이 예외는 무시되지 않고 호출 스택에 추가될 것이다.)
22.4.5 트랜잭션에서의 메시지 처리
트랜잭션에서 메시지 리스너를 호출하려면 리스너 컨테이너만 재설정하면 된다.
리스너 컨테이너 정의의 sessionTransacted 플래그로 로컬 리소스 트랜잭션은 활성화할 수 있다. 로컬 리소스 트랜잭션을 활성화하면 활성화된 JMS 트랜잭션내에서 각 메시지 리스너 호출을 할 수 있고 리스너 실행이 실패한 경우 메시지 수신이 롤백된다. 응답 메시지 전송 (SessionAwareMessageListener로)도 같은 로컬 트랜잭션에 포함되어 있지만 다른 리소스에 대한 작업(데이터베이스 접근 같은)은 독립적으로 수행될 것이다. 이는 데이터베이스 처리는 커밋되었지만 메시지 처리는 커밋이 실패한 경우를 포함해서 리스너 구현체에서 중복 메시지 탐지를 해야 한다.
1
2
3
4
5
6
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
외부에서 관리하는 트랜잭션에 참여하려면 트랜잭션 관리자를 설정하고 외부 관리 트랜잭션을 지원하는 리스너 컨테이너(보통 DefaultMessageListenerContainer)를 사용해야 한다.
XA 트랜잭션에 참가하도록 메시지 리스너 컨테이너를 설정하려면 JtaTransactionManager(이 트랜잭션 관리자는 기본적으로 Java EE 서버의 트랜잭션 하위시스템에 위임한다.)를 설정해야 한다. 여기서 의존 JMS ConnectionFactory는 XA 기능이 있어야 하고 JTA 트랜잭션 코디네이터(JTA transaction coordinator)가 적절히 등록되어 있어야 한다! (Java EE 서버의 JNDI 리소스 설정을 확인해라.) 이는 메시지 수신이 데이터베이스 접근과 함께 같은 트랜잭션에 있도록 한다.(통일된 커밋 개념을 가지지만 XA 트랜잭션 로그 비용이 크다.)
1
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
앞에서 본 컨테이너 설정에 위 부분을 추가하면 된다. 컨테이너가 나머지를 관리할 것이다.
1
2
3
4
5
6
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>
22.5 JCA 메시지 종점(Message Endpoints)에 대한 지원
스프링 2.5부터 JCA에 기반한 MessageListener 컨테이너도 지원하기 시작했다. JmsMessageEndpointManager는 프로바이더의 ResourceAdapter 클래스 이름으로 ActivationSpec 클래스 이름을 자동적으로 결정하려고 할 것이다. 그래서 다음 예제에서 보듯이 스프링의 지네릭 JmsActivationSpecConfig을 제공하는 것이 보통 가능하다.
1
2
3
4
5
6
7
8
9
<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
<property name="resourceAdapter" ref="resourceAdapter"/>
<property name="activationSpecConfig">
<bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
<property name="destinationName" value="myQueue"/>
</bean>
</property>
<property name="messageListener" ref="myMessageListener"/>
</bean>
또는 주어진 ActivationSpec 객체에 JmsMessageEndpointManager를 설정할 수 있다. ActivationSpec 객체도 JNDI 검색 (를 사용해서)에서 올 것이다.
1
2
3
4
5
6
7
8
9
10
<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
<property name="resourceAdapter" ref="resourceAdapter"/>
<property name="activationSpec">
<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="myQueue"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="messageListener" ref="myMessageListener"/>
</bean>
예제에서 설명한대로 스프링의 ResourceAdapterFactoryBean를 사용하면 로컬로 대상 ResourceAdapter를 설정한다.
1
2
3
4
5
6
7
8
9
10
<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
<property name="resourceAdapter">
<bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="tcp://localhost:61616"/>
</bean>
</property>
<property name="workManager">
<bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
</property>
</bean>
지정한 WorkManager도 환경에 특화된 스레드 풀을 가리킬 것이다.(보통 SimpleTaskWorkManager’s “asyncTaskExecutor” 프로퍼티를 통해서) 다중 아답터를 사용한다면 모든 ResourceAdapter 인스턴스에 공유 스레드 풀을 정의하는 것을 고려해 봐라.
일부 환경(WebLogic 9 이상 등)에서는 JNDI (를 사용해서)에서 전체 ResourceAdapter 객체를 가져올 것이다. 스프링 기반의 메시지 리스너는 서버가 제공하는 ResourceAdapter와 상호작용할 수 있고 서버에 내장된 WorkManager도 사용할 수 있다.
자세한 내용은 JmsMessageEndpointManager, JmsActivationSpecConfig, ResourceAdapterFactoryBean의 JavaDoc를 참고해라.
스프링은 JMS에 의존하지 않는 일반적인 JCA 메시지 엔트포인트 관리자 org.springframework.jca.endpoint.GenericMessageEndpointManager도 제공한다. 이 컴포넌트로 모든 종류의 메시지 리스너(CCI MessageListener등)와 프로바이더에 특화된 ActivationSpec 객체를 사용할 수 있다. 사용하는 커넥터의 실제 기능은 JCA 프로바이더의 문서를 참고하고 스프링에 특화된 자세한 설정은 GenericMessageEndpointManager JavaDoc을 참고해라.
Note
JCA 기반의 메시지 엔드포인트 관리는 EJB 2.1 메시지 주도 빈과 아주 비슷하다. JCA 기반의 메시지 엔드포인트 관리도 같은 의존 리소스 프로바이더 규약을 사용한다. EJB 2.1 MDB에서처럼 JCA 프로바이더가 지원하는 모든 메시지 리스너 인터페이스는 스프링 컨텍스트에서도 사용할 수 있다. 하지만 JMS가 JCA 엔드포인트 관리 규약에서 사용하는 가장 일반적인 엔드포인트 API이므로 스프링의 JMS 지원이 확실히 ‘편리하다’.
22.6 JMS 네임스페이스 지원
스프링 2.5에는 JMS 설정을 간략히 할 수 있는 XML 네임스페이스가 도입되었다. JMS 네임스페이스 요소를 사용하려면 JMS 스키마를 참조해야 한다.
1
2
3
4
5
6
7
8
9
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
</beans>
네임스페이스는 두개의 최상위 요소
1
2
3
4
5
<jms:listener-container>
<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>
</jms:listener-container>
위의 예제는 Section 22.4.4, “MessageListenerAdapter”에서 보여준 두 리스너 컨테이너 빈 정의와 두 MessageListenerAdapter 빈 정의를 생성하는 예제와 같다. 위에서 보여준 속성을 조금 더 설명하자면 listener 요소는 여러가지 선택적인 값을 포함할 수 있다. 다음 표에 사용가능한 모든 속성이 나와 있다.
Table 22.1. JMS 요소의 속성
속성 | 설명 |
---|---|
id | 제공하는 리스너 컨테이너의 빈(bean) 이름. 지정하지 않을 경우 빈 이름을 자동으로 생성한다. |
}destination(필수) | 해당 리스너의 목적지 이름으로 DestinationResolver 전략으로 처리한다. |
ref(필수) | 핸들러 객체의 빈 이름. |
method | 호출할 핸들러 메서드의 이름. ref가 MessageListener나 스프링 SessionAwareMessageListener를 가리킨다면 이 속성을 제외할 것이다. |
response-destination | 응답 메시지를 보낼 기본 응답 목적지의 이름. 이 이름은 요청 메시지에 “JMSReplyTo” 필드가 없는 경우에 적용될 것이다. 이 목적지의 타입은 리스너 컨테이너의 “destination-type” 속성으로 결정한다. Note: 이 값은 각 최종 객체가 응답 메시지로 잘 변환되도록 반환 값을 가진 리스너 메서드에만 적용한다. |
subscription | 존재한다면 지속가능한(durable) 구독의 이름. |
selector | 해당 리스너에 대한 선택적인 메시지 선택자. |
<listener-container/>
요소도 여러가지 선택적인 속성을 받는다. 그래서 기본 JMS 설정과 리소스 참조, 다양한 전략(예를 들면 taskExecutor와 destinationResolver)을 커스터마이징할 수 있다. 이러한 속성을 사용해서 편리한 네임스페이스의 장점을 이용하면서 리스너 컨테이너를 커스터마이징해서 정의할 수 있다.
1
2
3
4
5
6
7
8
9
10
<jms:listener-container connection-factory="myConnectionFactory"
task-executor="myTaskExecutor"
destination-resolver="myDestinationResolver"
transaction-manager="myTransactionManager"
concurrency="10">
<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>
</jms:listener-container>
다음 표에 사용할 수 있는 모든 속성이 나와 있다. 각 프로퍼티의 자세한 내용은 AbstractMessageListenerContainer와 그 하위 클래스의 클래스 수준 JavaDoc을 참고해라. Javadoc에는 트랜잭션 선택과 메시지 반환 시나리오에 대한 논의도 나와 있다.
Table 22.2. JMS 요소의 속성
속성 | 설명 |
---|---|
container-type | 해당 리스너 컨테이너의 타입. 사용할 수 있는 옵션은 default, simple, default102, simple102가 있다.(기본값은 ‘default’이다.) |
connection-factory | JMS ConnectionFactory 빈에 대한 참조(기반 빈 이름은 ‘connectionFactory’이다.) |
task-executor | JMS 리스너 인보커에 대한 스프링 TaskExecutor 참조. |
destination-resolver | JMS Destinations 처리의 DestinationResolver 전략에 대한 참조. |
message-converter | JMS 메시지를 리스너 메시지 인자로 변환하는 MessageConverter 전략에 대한 참조. 기본값은 SimpleMessageConverter이다. |
destination-type | 해당 리스너에 대한 JMS 목적지 타입으로 queue, topic, durableTopic가 있다. 기본값은 queue이다. |
client-id | 해당 리스너 컨테이너에 대한 JMS 클라이언트 아이디. 지속가능한 구독을 사용하는 경우에는 지정해야 한다. |
cache | JMS 리소스에 대한 캐시 수준으로 none, connection, session, consumer, auto의 값을 사용할 수 있다. 기본값(auto)에서 외부 트랜잭션 관리자를 지정하지 않는 한 캐시 수준은 사실상 “consumer”가 될 것이다. 외부 트랜잭션 관리자를 지정한 경우 기본값이 none가 될 것이다.(주어진 ConnectionFactory의 Java EE 방식 트랜잭션 관리가 XA식 풀(pool)이라고 가정한다.) |
acknowledge | 네이티브 JMS 확인(acknowledge) 모드로 auto, client, dups-ok, transacted의 값을 사용할 수 있다. transacted의 값은 로컬 트랜잭션이 적용된 Session을 활성화한다. 아니면 아래에서 설명하는 transaction-manager 속성을 지정해라. 기본값은 auto이다. |
transaction-manager | 외부 PlatformTransactionManager(보통 XA기반 트랜잭션 코디네이터로 예를 들면 스프링의 JtaTransactionManager이다)에 대한 참조. 지정하지 않은 경우 네이티브 확인을 사용한다. (“acknowledge” 속성 참고) |
concurrency | 각 리스너를 시작할때 동시에 사용할 세션/컨슈머의 수. 최대 수를 숫자로 지정하거나(예: “5”) 최대/최소 범위를 지정(예: “3-5”)할 수 있다. 지정한 최소값은 힌트값으로만 사용하고 런타임에서는 무시한다. 기본값은 1이다. 토픽(topic) 리스너이거나 큐의 순서가 중요한 경우에는 concurrency를 1로 지정하고 일반적인 큐에는 1 이상으로 지정하는 것을 고려해봐라. |
prefetch | 단일 세션에 로드할 최대 메시지의 수. 동시성 컨슈머의 기아상태(starvation)를 유도하려면 이 값을 더 크게 지정해라! |
“jms” 스키마를 지원하는 JCA 기반 리스너 컨테이너를 설정하는 것은 아주 비슷하다.
1
2
3
4
5
6
7
8
<jms:jca-listener-container resource-adapter="myResourceAdapter"
destination-resolver="myDestinationResolver"
transaction-manager="myTransactionManager"
concurrency="10">
<jms:listener destination="queue.orders" ref="myMessageListener"/>
</jms:jca-listener-container>
다양한 JCA에서 사용할 수 있는 설정 옵션은 다음 표에 나와 있다.
속성 | 설명 |
---|---|
resource-adapter | JCA ResourceAdapter 빈에 대한 참조(기본 빈 이름은 ‘resourceAdapter’다). |
activation-spec-factory | JmsActivationSpecFactory에 대한 참조. JMS 프로바이더와 프로바이더의 ActivationSpec 클래스를 자동으로 탐지하는 것이 기본값이다.(DefaultJmsActivationSpecFactory 참고) |
destination-resolver | JMS Destinations 처리의 DestinationResolver 전략에 대한 참조. |
message-converter | JMS 메시지를 리스너 메시드의 인자로 변환하는 MessageConverter 전략에 대한 참조. 기본값은 SimpleMessageConverter이다. |
destination-type | 해당 리스너의 JMS 목적지 종류로 queue, topic, durableTopic가 있다. 기본값은 queue다. |
client-id | 해당 리스너 컨테이너에 대한 JMS 클라이언트 아이디. 지속가능한 구독을 사용하는 경우 지정해야 한다. |
acknowledge | 네이티브 JMS 승인 모드로 auto, client, dups-ok, transacted가 있다. transacted의 값은 로컬로 트랜잭션이 적용되 Session를 활성화시킨다. 아니면 아래에서 설명한 transaction-manager를 지정해라. 기본값은 auto다. |
transaction-manager | 들어오는 각 메시지에 XA 트랜잭션을 시작하는 스프링 JtaTransactionManager나 javax.transaction.TransactionManager에 대한 참조. 지정하지 않으면 네이티브 승인을 사용한다.(“acknowledge” 속성 참조) |
concurrency | 각 리스너에 동시에 시작할 세션/컨슈머의 수. 최대 수를 숫자로 지정하거나(예: “5”) 최대/최소 범위를 지정(예: “3-5”)할 수 있다. 지정한 최소값은 힌트값으로만 사용하고 JCA 리스너 컨테이너를 사용하는 경우 런타인에서는 보통 무시한다. 기본값은 1이다. |
prefetch | 단일 세션에 로드할 메시지의 최대 수. 동시성 컨슈머의 기아상태(starvation)를 유도하려면 이 값을 더 크게 지정해라! |
Leave a comment