001package org.opengion.plugin.daemon; 002 003import java.io.File; 004import java.util.Date; 005 006import javax.jms.JMSException; 007import javax.jms.Message; 008import javax.jms.MessageListener; 009import javax.jms.TextMessage; 010 011// import org.opengion.fukurou.util.BizUtil; 012import org.opengion.fukurou.business.BizUtil; 013import org.opengion.fukurou.queue.QueueInfo; 014import org.opengion.fukurou.queue.QueueReceive; 015import org.opengion.fukurou.queue.QueueReceiveFactory; 016import org.opengion.fukurou.util.HybsTimerTask; 017import org.opengion.fukurou.util.StringUtil; 018import org.opengion.hayabusa.common.HybsSystem; 019import org.opengion.hayabusa.common.HybsSystemException; 020import org.opengion.hayabusa.queue.DBAccessQueue; 021 022/** 023 * メッセージキュー受信 メッセージキューの受信処理を行います。 024 * 025 * @og.group メッセージ連携 026 * 027 * @og.rev 5.10.15.2 (2019/09/20) 新規作成 028 * 029 * @version 5.0 030 * @author oota 031 * @since JDK7 032 * 033 */ 034public class Daemon_QueueReceive extends HybsTimerTask { 035 private int loopCnt = 0; 036 private QueueReceive queueReceive = null; 037 038 private static final int LOOP_COUNTER = 24; 039 040 private final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys("CLOUD_SQS_ACCESS_KEY"); 041 private final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys("CLOUD_SQS_SECRET_KEY"); 042 private final String MQ_QUEUE_TYPE; 043 private final String MQ_QURUE_SERVER_URL = HybsSystem.sys("MQ_QUEUE_SERVER_URL"); 044 private final String MQ_QUEUE_RECEIVE_LISTENER =HybsSystem.sys("MQ_QUEUE_RECEIVE_LISTENER"); 045 046 private final String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 047 private final String USER_ID = "CYYYYY"; 048 private final String PG_ID; 049 private final String DMN_NAME = "QueueReceiveDMN"; 050 private final DBAccessQueue dbAccessQueue; 051 052 /** 053 * コンストラクター 054 * 初期処理を行います。 055 */ 056 public Daemon_QueueReceive() { 057 super(); 058 059 // パラメータの設定 060 if(!StringUtil.isNull(HybsSystem.sys("MQ_QUEUE_TYPE"))) { 061 MQ_QUEUE_TYPE = HybsSystem.sys("MQ_QUEUE_TYPE").toUpperCase(); 062 PG_ID = StringUtil.cut("QueRec" + MQ_QUEUE_TYPE, 10); 063 }else { 064 throw new RuntimeException("システムリソースにMQ_QUEUE_TYPEを登録して下さい"); 065 } 066 067 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 068 } 069 070 /** 071 * 初期処理 MQサーバに接続します。 072 */ 073 @Override 074 public void initDaemon() { 075 // 開始ログO 076 final StringBuilder errMsg = new StringBuilder(); 077 if (MQ_QUEUE_TYPE == null) { 078 errMsg.append("MQ_QUEUE_TYPE"); 079 } 080 if (MQ_QURUE_SERVER_URL == null) { 081 errMsg.append(" MQ_QUEUE_SERVER_URL"); 082 } 083 084 if (errMsg.length() > 0) { 085 errMsg.append(" キュータイプを特定するために、左記のシステムリソースを登録して下さい。"); 086 throw new HybsSystemException(errMsg.toString()); 087 } 088 089 final String queueType = MQ_QUEUE_TYPE.toUpperCase(); 090 091 // 開始ログ 092 System.out.println("MQキュータイプ:" + queueType); 093 System.out.println("MQサーバーURL:" + MQ_QURUE_SERVER_URL); 094 095 queueReceive = QueueReceiveFactory.newQueueReceive(queueType); 096 097 queueReceive.connect(MQ_QURUE_SERVER_URL, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 098 099 } 100 101 /** 102 * 開始処理 タイマータスクのデーモン処理の開始ポイントです。 103 */ 104 @Override 105 protected void startDaemon() { 106 if (loopCnt % LOOP_COUNTER == 0) { 107 loopCnt = 1; 108 System.out.println(); 109 System.out.print(toString() + " " + new Date() + " "); 110 } else { 111 // 対象 キュー名(グループ名)とbizlogic名の取得処理 112 final String[][] ge67vals = dbAccessQueue.setlectGE67(); 113 // キュー情報登録チェック 114 if (ge67vals.length == 0) { 115 final String errMsg = "GE67にキュー情報が登録されていません。"; 116 throw new RuntimeException(errMsg); 117 } 118 // MQとSQSで処理を分岐 119 // MQ:指定キューIDからキューメッセージを取得 120 // SQS:キューメッセージを取得してからキューID(グループID)を取得 121 switch (MQ_QUEUE_TYPE) { 122 case "MQ": 123 processMq(ge67vals); 124 break; 125 case "SQS": 126 processSqs(ge67vals); 127 break; 128 default: 129 final String errMsg = "リソース(MQ_QUEUE_TYPE)の値が不正です。:" + MQ_QUEUE_TYPE; 130 throw new RuntimeException(errMsg); 131 } 132 133 loopCnt++; 134 } 135 } 136 137 /** 138 * MQ用の処理 139 * GE67に登録されているキューIDの、 140 * メッセージキューを取得して処理を行います。 141 * 142 * @param ge67vals GE67の配列データ 143 */ 144 private void processMq(final String[][] ge67vals) { 145 boolean listenerMode = false; 146 147 if("true".equals(MQ_QUEUE_RECEIVE_LISTENER)) { 148 listenerMode = true; 149 } 150 151 if(listenerMode) { 152 // リスナーの初期化 153 queueReceive.closeListener(); 154 } 155 156 // ge67のキューリスト分繰り返します 157 for (int row = 0; row < ge67vals.length; row++) { 158 final String queueId = ge67vals[row][0]; 159 final String bizLogicId = ge67vals[row][1]; 160 161 if(listenerMode) { 162 // リスナーを設定して、動的な受信処理(MQ専用) 163 final QueueReceiveListener listener = new QueueReceiveListener(queueId, bizLogicId); 164 queueReceive.setListener(queueId, listener); 165 }else { 166 // 1件の受信処理 167 final QueueInfo queueInfo = queueReceive.receive(queueId); 168 if (queueInfo != null) { 169 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 170 // 1件処理を行ったら処理を終了します。 171 break; 172 } 173 } 174 } 175 } 176 177 /** 178 * SQS用の処理 179 * SQSはグループIDを指定して、キューを取得することはできず、 180 * 任意のキューを1つ取得してから、 181 * 判定処理を行います。 182 * GE67に登録されていないグループIDのキューが取得された場合は、 183 * GE68にエラーレコードを登録します。 184 * 185 * @param ge67vals GE67の配列データ 186 */ 187 private void processSqs(final String[][] ge67vals) { 188 // 下記はSQSの場合(キューを1件取得して処理) 189 final QueueInfo queueInfo = queueReceive.receive(null); 190 191 // キューが未取得の場合 192 if(queueInfo == null) { 193 return; 194 } 195 196 // 受信したキューを処理 197 final String groupId = queueInfo.getSqsFifoGroupId(); 198 Boolean existsFlg = false; 199 // valsにグループIDのレコードが存在するか検索 200 for (int row = 0; row < ge67vals.length; row++) { 201 final String queueId = ge67vals[row][0]; 202 203 if (groupId != null && groupId.equals(queueId)) { 204 // 該当レコードあり 205 final String bizLogicId = ge67vals[row][1]; 206 processMessage(queueId, bizLogicId, queueInfo.getMessage()); 207 208 existsFlg = true; 209 break; 210 } 211 } 212 213 if (!existsFlg) { 214 // 該当groupIdの未登録エラー 215 // 処理番号生成 216 final String syoriNo = dbAccessQueue.generateSyoriNo(); 217 dbAccessQueue.insertGE68(groupId, syoriNo, null, queueInfo.getMessage()); 218 dbAccessQueue.updateGE68Error(syoriNo, "SQSキューに設定されているグループIDが、GE67に未登録です。"); 219 } 220 } 221 222 /** 223 * キャンセル処理 224 * タイマータスクのデーモン処理の終了ポイントです。 225 * 226 * @return キャンセルできれば、true 227 */ 228 @Override 229 public boolean cancel() { 230 if (queueReceive != null) { 231 queueReceive.close(); 232 } 233 234 return super.cancel(); 235 } 236 237 /** 238 * メッセージの処理 239 * 受信したメッセージをbizLogicに渡して、 240 * 処理を実行します。 241 * 242 * @param queueId キューID 243 * @param bizLogicId ビズロジックID 244 * @param msgText 受信メッセージ 245 */ 246 private void processMessage(final String queueId, final String bizLogicId, final String msgText) { 247 String syoriNo = ""; 248 try { 249 // 処理番号生成 250 syoriNo = dbAccessQueue.generateSyoriNo(); 251 252 // 管理テーブル登録 253 dbAccessQueue.insertGE68(queueId, syoriNo, bizLogicId, msgText); 254 255 // bizLogicの処理を実行 256 callActBizLogic(SYSTEM_ID, bizLogicId, msgText); 257 258 // 管理テーブル更新(完了) 259 dbAccessQueue.updateGE68(syoriNo, DBAccessQueue.FGKAN_END); 260 261 } catch (Throwable te) { 262 // bizLogicでのエラーはログの未出力して、処理を継続します。 263 // bizLogicのエラー情報はCauseに格納されているため、Causeから取得します。 264 String errMessage = null; 265 if (te.getCause() != null) { 266 // causeが設定されている場合のエラー情報 267 errMessage = te.getCause().getMessage(); 268 } else { 269 // causeが未設定の場合のエラー情報 270 errMessage = te.getMessage(); 271 } 272 System.out.println(errMessage); 273 try { 274 // エラーテーブルに登録 275 dbAccessQueue.updateGE68Error(syoriNo, errMessage); 276 } catch (Exception e) { 277 // ここでのエラーはスルーします。 278 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 279 } 280 } 281 } 282 283 /** 284 * bizLogic処理の呼び出し 285 * 必要なパス情報をリソースから取得して、 286 * BizUtil.actBizLogicにパス情報を渡すことで、 287 * bizLogicの処理を行います。 288 * 289 * @param systemId システムID 290 * @param logicName ロジックファイル名 291 * @param msgText メッセージ 292 * @throws Throwable エラー情報 293 */ 294 private void callActBizLogic(final String systemId, final String logicName, final String msgText) throws Throwable { 295 // 対象 クラスパスの生成 296 // HotDeploy機能を使用する場合に、Javaクラスをコンパイルするためのクラスパスを設定します。 297 // 対象となるクラスパスは、WEB-INF/classes 及び WEB-INF/lib/*.jar です。 298 // bizLogicTag.javaのコードを移植 299 final StringBuilder sb = new StringBuilder(); 300 sb.append('.').append(File.pathSeparatorChar); 301 final File lib = new File(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "lib"); 302 final File[] libFiles = lib.listFiles(); 303 for (int i = 0; i < libFiles.length; i++) { 304 sb.append(libFiles[i].getAbsolutePath()).append(File.pathSeparatorChar); 305 } 306 sb.append(HybsSystem.sys("REAL_PATH") + "WEB-INF" + File.separator + "classes").append(File.pathSeparatorChar); 307 // bizの下のパス 308 sb.append(HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH")).append(File.pathSeparatorChar); 309 // 上記で生成したクラスパスをclassPathに格納 310 final String classPath = sb.toString(); 311 312 // ソースパス情報の生成 313 final String srcDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_SRC_PATH"); 314 final String classDir = HybsSystem.sys("REAL_PATH") + HybsSystem.sys("BIZLOGIC_CLASS_PATH"); 315 final boolean isAutoCompile = HybsSystem.sysBool("BIZLOGIC_AUTO_COMPILE"); 316 final boolean isHotDeploy = HybsSystem.sysBool("BIZLOGIC_HOT_DEPLOY"); 317 318 // bizLogicに渡すパラメータ 319 final String[] keys = new String[] { "message" }; 320 final String[] vals = new String[] { msgText }; 321 322 // bizLogic処理の実行 323 BizUtil.actBizLogic(srcDir, classDir, isAutoCompile, isHotDeploy, classPath, systemId, logicName, keys, vals); 324 } 325 326 /** 327 * 受信処理リスナー用のインナークラス 328 * QueueReceiveリスナークラス リスナー用のクラスです。 329 * MQに設定することで、メッセージが受信されると、 330 * onMessageメソッドが実行されます。 331 * 332 */ 333 class QueueReceiveListener implements MessageListener { 334// private String queueId = ""; 335// private String bizLogicId = ""; 336 private final String queueId ; 337 private final String bizLogicId ; 338 339 /** 340 * コンストラクター 初期処理を行います。 341 * 342 * @param quId キューID 343 * @param bizId ビズロジックID 344 */ 345 public QueueReceiveListener(final String quId, final String bizId) { 346 queueId = quId; 347 bizLogicId = bizId; 348 } 349 350 /** 351 * メッセージ受信処理 MQサーバにメッセージが受信されると、 メソッドの処理が行われます。 352 * 353 * @param message 受信メッセージ 354 */ 355 @Override 356 public void onMessage(final Message message) { 357 // 要求番号 358 String ykno = ""; 359 360 // メッセージ受信 361 final TextMessage msg = (TextMessage) message; 362 String msgText = ""; 363 364 try { 365 // キューサーバのメッセージを取得 366 msgText = msg.getText(); 367 368 // メーッセージの受信応答を返します。 369 msg.acknowledge(); 370 371 processMessage(queueId, bizLogicId, msgText); 372 373 } catch (JMSException jmse) { 374 try { 375 // 管理テーブル更新 376 // 管理テーブル更新(エラー) 377 dbAccessQueue.updateGE68(ykno, DBAccessQueue.FGKAN_ERROR); 378 } catch (Exception e) { 379 // ここでのエラーはスルーします。 380 System.out.println("管理テーブル登録エラー:" + e.getMessage()); 381 } 382 383 throw new HybsSystemException("bizLogicの処理中にエラーが発生しました。" + jmse.getMessage()); 384 } 385 } 386 } 387}