001/*
002 * Copyright (c) 2009 The openGion Project.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
013 * either express or implied. See the License for the specific language
014 * governing permissions and limitations under the License.
015 */
016
017package org.opengion.fukurou.queue;
018
019import javax.jms.Connection;
020import javax.jms.JMSException;
021import javax.jms.MessageProducer;
022import javax.jms.Queue;
023import javax.jms.QueueConnectionFactory;
024import javax.jms.QueueSession;
025import javax.jms.Session;
026import javax.jms.TextMessage;
027import javax.naming.Context;
028import javax.naming.InitialContext;
029import javax.naming.NamingException;
030
031import org.apache.activemq.ActiveMQConnection;
032import org.apache.activemq.ActiveMQConnectionFactory;
033// import org.opengion.hayabusa.common.HybsSystemException;
034
035// import com.sun.star.uno.RuntimeException;
036
037/**
038 * MQサーバへのメッセージキュー送信用クラス
039 * 
040 * MQサーバへのメッセージキュー送信用のクラスです。
041 * Apache ActiveMQとAmazonMQへの送信が可能です。
042 * tomcatからの送信(JNDI利用)と、
043 * バッチ処理(urlを指定し接続)の2通りが可能です。
044 * 
045 * ※Apache ActiveMQとAmazonMQの切り替えは、
046 * jmsServerの接続先URLを変更するのみで接続の変更が可能です。
047 * (proxy環境からAmazonMqへの接続は行えない場合があります)
048 * 
049 * @og.group メッセージ連携
050 * 
051 * @og.rev 5.10.14.0 (2019/08/01) 新規作成
052 * 
053 * @version 5
054 * @author oota
055 * @since JDK7
056 *
057 */
058public class QueueSend_MQ implements QueueSend {
059        private Connection              connection;
060        private Session                 session;
061        private MessageProducer sender;
062        private Context                 ctx;
063        // バッチ用フィールド
064        private boolean batch;
065        private String  mqUserId        = "";
066        private String  mqPassword      = "";
067
068        /**
069         * 接続処理
070         * MQサーバに接続を行います。
071         * 
072         * @param jmsServer jmsサーバ接続名(バッチの場合はurl)
073         */
074        public void connect(final String jmsServer) {
075                try {
076                        ctx = new InitialContext();
077                        // 1. Connectionの作成s
078//                      QueueConnectionFactory factory = null;
079                        final QueueConnectionFactory factory;
080                        if (batch) {
081                                // バッチ処理の場合。URL指定で、ユーザIDとパスワードを指定して接続。
082                                mqUserId        = System.getProperty("mqUserId");
083                                mqPassword      = System.getProperty("mqPassword");
084                                factory         = new ActiveMQConnectionFactory(jmsServer);
085                                connection      = (ActiveMQConnection)factory.createConnection(mqUserId, mqPassword);
086                        } else {
087                                // tomcat接続の場合。JNDIを利用して接続。
088                                factory = (QueueConnectionFactory) ctx.lookup("java:comp/env/" + jmsServer);
089                                connection = (ActiveMQConnection)factory.createConnection();
090                        }
091
092                        // 2. Connectioの開始
093                        connection.start();
094
095                } catch (final JMSException jmse) {
096                        throwErrMsg("MQサーバーの接続に失敗しました。" + jmse.getMessage());
097                } catch (final NamingException ne) {
098                        throwErrMsg("名前解決に失敗しました。" + ne.getMessage());
099                }
100        }
101
102        /**
103         * 接続処理
104         * MQサーバに接続します。
105         * connect(String jmsServer)と同じ処理になります。
106         *
107         * @og.rev 5.10.15.0 (2019/08/30) 引数追加対応
108         * 
109         * @param jmsServer jmsサーバ情報
110         * @param sqsAccessKey アクセスキー(MQサーバでは未使用)
111         * @param sqsSecretKey シークレットキー(MQサーバでは未使用)
112         */
113        @Override
114        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
115                // MQではsqsAccessKeyとsqsSecretKeyは利用しません。
116                connect(jmsServer);
117        }
118
119        /**
120         * エラーメッセージ送信。
121         * 
122         * @og.rev 5.10.15.0 (2019/08/30) Hybs除外
123         * 
124         * @param errMsg エラーメッセージ
125         */
126        public void throwErrMsg(final String errMsg) {
127                throw new RuntimeException( errMsg );
128//              if (batch) {
129//                      // バッチ用エラー
130//                      throw new RuntimeException(errMsg);
131//              } else {
132//                      // 画面用エラー
133//                      throw new HybsSystemException(errMsg);
134//              }
135        }
136
137        /**
138         * メッセージ送信
139         * MQサーバにメッセージを送信します。
140         * 
141         * @param queueInfo 送信キュー情報
142         */
143        @Override
144        public void sendMessage(final QueueInfo queueInfo) {
145                try {
146                        // 初期チェック
147                        if (connection == null) {
148                                throwErrMsg("MQサーバに接続されていません。");
149                        }
150
151                        // 1. QueueSessionの作成
152                        session = connection.createSession(queueInfo.isMqTransacted(), queueInfo.getMqAcknowledgeMode());
153                        if (session == null) {
154                                throwErrMsg("キューセッションの生成に失敗しました。");
155                        }
156
157                        // 2. Queueの作成
158//                      Queue queue = null;
159//                      queue = session.createQueue(queueInfo.getMqQueueName());
160                        final Queue queue = session.createQueue(queueInfo.getMqQueueName());
161                        sender = session.createProducer(queue);
162
163                        // 3. テキストメッセージの作成
164                        final TextMessage msg = session.createTextMessage(queueInfo.getMessage());
165
166                        // 4. 送信処理
167                        sender.send(msg);
168
169                } catch (JMSException e) {
170                        throwErrMsg("キューの送信処理に失敗しました。" + e.getMessage());
171                }
172        }
173
174        /**
175         * クローズ処理
176         * MQサーバとの接続をクローズします。
177         */
178        @Override
179        public void close() {
180                if (ctx != null) {
181                        try {
182                                ctx.close();
183                        } catch (Exception e) {
184                                System.out.println("ctxのクローズに失敗しました。");
185                        }
186                }
187                // 1. sender,session,connectionのクローズ処理
188                if (sender != null) {
189                        try {
190                                sender.close();
191                        } catch (Exception e) {
192                                System.out.println("senderのクローズに失敗しました。");
193                        }
194                }
195                if (session != null) {
196                        try {
197                                session.close();
198                        } catch (Exception e) {
199                                System.out.println("sessionのクローズに失敗しました。");
200                        }
201                }
202                if (connection != null) {
203                        try {
204                                connection.close();
205                        } catch (Exception e) {
206                                System.out.println("connectionのクローズに失敗しました。");
207                        }
208                }
209        }
210
211        /**
212         * バッチ処理判定フラグを設定します。
213         * バッチ処理の場合は引数で接続先情報を与えます。
214         * それ以外の場合(Tomcat)ではJNDIより情報を取得します。
215         * 
216         * @param batchFlg バッチ処理判定フラグ
217         */
218        @Override
219        public void setBatchFlg(final Boolean batchFlg) {
220                batch = batchFlg;
221        }
222
223        /**
224         * テスト用メソッド
225         * テスト実行用です。
226         * 
227         * @param args 引数
228         */
229        public static void main(final String[] args) {
230                System.out.println("main start");
231                // 送信情報の設定
232                final String url = "tcp://localhost:61616";
233                final String queueName = "test01";
234                final String msg = "送信メッセージ";
235                
236                final QueueInfo queueInfo = new QueueInfo();
237                queueInfo.setMqQueueName(queueName);
238                queueInfo.setMqTransacted(false);
239                queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
240                queueInfo.setMessage(msg);
241
242                final QueueSend queueSend = new QueueSend_MQ();
243                queueSend.setBatchFlg(true);
244
245                try {
246                        queueSend.connect(url,null,null);
247//                      queueSend.connect(url);
248                        queueSend.sendMessage(queueInfo);
249                } catch (final Exception e) {
250                        System.out.println(e.getMessage());
251                } finally {
252                        queueSend.close();
253                }
254
255                System.out.println("main end");
256        }
257}