View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc.adapter.jdbc; 48 49 import com.rhi.architecture.config.ConfigurationException; 50 import com.rhi.architecture.logging.LogUtil; 51 import com.rhi.architecture.logging.Logger; 52 import com.rhi.architecture.parc.Error; 53 import com.rhi.architecture.parc.ProcessingException; 54 import com.rhi.architecture.parc.Record; 55 import com.rhi.architecture.parc.adapter.AbstractOutputAdapter; 56 import com.rhi.architecture.resource.InitializationException; 57 import com.rhi.architecture.xa.DatabaseTransaction; 58 import com.rhi.architecture.xa.Transaction; 59 60 import java.sql.Connection; 61 import java.sql.PreparedStatement; 62 import java.sql.SQLException; 63 import java.util.ArrayList; 64 import java.util.Collection; 65 import java.util.Iterator; 66 import java.util.List; 67 import java.util.Properties; 68 69 /*** 70 * JDBC Output Adapter. Helper class for interfaces implementing an 71 * Output Adapter using JDBC. 72 * 73 * @author Pete McKinstry 74 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 75 * 76 * @since 1.0 77 */ 78 public abstract class JDBCOutputAdapter extends AbstractOutputAdapter { 79 80 private static Logger log = null; 81 82 // output record SQL 83 private static final String PersistValidRecord_KEY = 84 "JDBCOutputAdapter.PersistValidRecord"; 85 private String PERSIST_VALID_RECORD_SQL; 86 87 // error record SQL 88 private static final String PersistErrorRecord_KEY = 89 "JDBCOutputAdapter.PersistErrorRecord"; 90 private String PERSIST_ERROR_RECORD_SQL; 91 92 // update source record, valid 93 private static final String UpdateSourceValidRecord_KEY = 94 "JDBCOutputAdapter.UpdateSourceRecordValid"; 95 private String UPDATE_SOURCE_VALID_SQL; 96 97 // update source record, error 98 private static final String UpdateSourceErrorRecord_KEY = 99 "JDBCOutputAdapter.UpdateSourceRecordError"; 100 private String UPDATE_SOURCE_ERROR_SQL; 101 102 /*** 103 * Default constructor 104 * 105 * @since 1.0 106 */ 107 public JDBCOutputAdapter() { 108 super(); 109 } 110 111 /*** 112 * Initialize the Logger, and load the SQL statements. 113 * 114 * @param props - the configuration data 115 * @throws InitializationException - Logger not properly configured 116 * 117 * @since 1.0 118 */ 119 public void init(Properties props) throws InitializationException { 120 121 try { 122 log = LogUtil.getLogger(); 123 } 124 catch (ConfigurationException e) { 125 throw new InitializationException("Logger creation failed.", e); 126 } 127 log.debug("init"); 128 129 this.PERSIST_VALID_RECORD_SQL = 130 props.getProperty(PersistValidRecord_KEY); 131 this.PERSIST_ERROR_RECORD_SQL = 132 props.getProperty(PersistErrorRecord_KEY); 133 this.UPDATE_SOURCE_VALID_SQL = 134 props.getProperty(UpdateSourceValidRecord_KEY); 135 this.UPDATE_SOURCE_ERROR_SQL = 136 props.getProperty(UpdateSourceErrorRecord_KEY); 137 138 super.init(props); 139 } 140 141 /*** 142 * Cleanup any resources. 143 * 144 * @since 1.0 145 */ 146 public void cleanup() { 147 log.debug("running cleanup"); 148 log = null; // help w/ GC. 149 super.cleanup(); 150 } 151 152 /*** 153 * Handle persistence of valid records from the framework. 154 * 155 * @param c - valid records. 156 * @param t - the transaction to use in committing the records. 157 * @return Collection - records errored during persistence. 158 * @exception ProcessingException 159 * 160 * @since 1.0 161 */ 162 protected Collection handleValidRecords(Collection c, Transaction t) 163 throws ProcessingException { 164 log.debug("persisting collection of " + c.size() + " valid records"); 165 ArrayList errors = new ArrayList(); 166 167 Iterator iter = c.iterator(); 168 int counter = 0; 169 while (iter.hasNext()) { 170 Record record = (Record) iter.next(); 171 ++counter; 172 if (record.isValid()) { 173 // persist the valid output record. 174 boolean result = persistValidRecord(record, t); 175 if (result == false) { 176 log.error( 177 "error persisting valid record = " + record.toString()); 178 // give subclasses the ability to add a application 179 // specific error to the record. 180 addErrorToRecord(record); 181 errors.add(record); 182 } 183 184 // mark the source record complete. (valid) 185 result = updateSourceForValidRecord(record, t); 186 if (result == false) { 187 log.error( 188 "error updating valid record = " + record.toString()); 189 // give subclasses the ability to add a application 190 // specific error to the record. 191 addErrorToRecord(record); 192 errors.add(record); 193 } 194 } 195 else { 196 errors.add(record); 197 } 198 } 199 200 return errors; 201 } 202 203 /*** 204 * Add or update valid records to the output table. 205 * 206 * @param record 207 * @param t 208 * @return boolean - true or false if the record was persisted properly. 209 * @exception ProcessingException - irrecoverable failure. 210 * 211 * @since 1.0 212 */ 213 protected boolean persistValidRecord(Record record, Transaction t) 214 throws ProcessingException { 215 PreparedStatement stmt = null; 216 boolean result = false; 217 218 try { 219 Connection conn = ((DatabaseTransaction) t).getConnection(); 220 if (conn == null) { 221 log.error("transaction connection null."); 222 throw new ProcessingException("transaction does not contain valid db connection."); 223 } 224 225 stmt = prepareValidStatement(record, conn); 226 227 int rowcount = stmt.executeUpdate(); 228 if (rowcount > 0) { 229 result = true; 230 } 231 return result; 232 } 233 catch (SQLException e) { 234 log().error( 235 "JDBCOutputAdapter.persistValidRecord() " 236 + "SQLException thrown. e = " 237 + e, 238 e); 239 log().error("record = " + record); 240 throw new ProcessingException( 241 "JDBCOutputAdapter.persistValidRecord() " + e.toString(), 242 e); 243 } 244 finally { 245 // connection is managed by the Transaction & should not be closed 246 // within the handleValidRecords()/handleErrorRecords() methods. 247 // ... but close the statement. 248 if (stmt != null) { 249 try { 250 stmt.close(); 251 } 252 catch (SQLException e) { 253 log.error("sqlexception closing db statement.", e); 254 } 255 } 256 } 257 } 258 259 /*** 260 * Get SQL, prepare SQL statement, and set all the appropriate fields. 261 * 262 * @param record 263 * @param conn 264 * @return PreparedStatement 265 * @throws SQLException 266 * @throws ProcessingException 267 * 268 * @since 1.0 269 */ 270 protected PreparedStatement prepareValidStatement( 271 Record record, 272 Connection conn) 273 throws SQLException, ProcessingException { 274 275 String sql = getPersistValidSQL(); 276 if (sql == null) { 277 log().error( 278 "JDBCOutputAdapter.prepareValidStatement(): " 279 + "SQL for persisting valid record missing."); 280 throw new ProcessingException("No SQL provided to persist valid record."); 281 } 282 PreparedStatement stmt = conn.prepareStatement(sql); 283 284 setValidStatementFields(record, stmt); 285 286 return stmt; 287 } 288 289 /*** 290 * Set all the fields on the prepared statement using the output record. 291 * Not abstract so that sub-classes that don't use prepareValidStatement 292 * need not implement this w/ a no-op, but if anyone who _should_ implement 293 * it doesn't, it will "fail fast" & let them know immediately. 294 * @param record 295 * @param stmt - the statement to execute. 296 * @throws ProcessingException 297 * @throws SQLException 298 * 299 * @since 1.0 300 */ 301 protected void setValidStatementFields( 302 Record record, 303 PreparedStatement stmt) 304 throws SQLException, ProcessingException { 305 throw new ProcessingException( 306 "setValidStatementFields() called, " 307 + "but not implemented on JDBCOutputAdapter."); 308 } 309 310 /*** 311 * Return the SQL for persisting (insert or update) a new valid record. 312 * 313 * @return String 314 * 315 * @since 1.0 316 */ 317 protected String getPersistValidSQL() { 318 return this.PERSIST_VALID_RECORD_SQL; 319 } 320 321 /*** 322 * General Case implementation for persisting error records. 323 * 324 * @param c - errored records 325 * @param t - the database transaction under which 326 * any SQL inserts/updates should be made. 327 * 328 * @exception ProcessingException 329 * 330 * @since 1.0 331 */ 332 protected void handleErrorRecords(Collection c, Transaction t) 333 throws ProcessingException { 334 log.debug("handleErrorRecords()"); 335 336 log.debug( 337 "persisting collection of " + c.size() + " errored records"); 338 339 Iterator iter = c.iterator(); 340 while (iter.hasNext()) { 341 Record record = (Record) iter.next(); 342 343 List list = record.getErrors(); 344 Error error = (Error) list.get(0); 345 if (error == null) { 346 throw new ProcessingException("record missing error"); 347 } 348 /*With error records, this is not recoverable, so throw 349 * a failure exception to cause a shutdown. 350 */ 351 boolean result = persistErrorRecord(error, t); 352 if (result == false) { 353 throw new ProcessingException( 354 "failure persisting " 355 + "error record = " 356 + record.toString()); 357 } 358 359 result = updateSourceForErrorRecord(record, t); 360 if (result == false) { 361 throw new ProcessingException( 362 "failure updating " 363 + "source for error record = " 364 + record.toString()); 365 } 366 367 } // while(recordIter.hasNext()) 368 } 369 370 /*** 371 * Add or update the error record. 372 * 373 * @param error 374 * @param t 375 * @return boolean 376 * @exception ProcessingException 377 * 378 * @since 1.0 379 */ 380 protected boolean persistErrorRecord(Error error, Transaction t) 381 throws ProcessingException { 382 PreparedStatement stmt = null; 383 boolean result = false; 384 try { 385 Connection conn = ((DatabaseTransaction) t).getConnection(); 386 if (conn == null) { 387 log.error("transaction connection null."); 388 throw new ProcessingException( 389 "transaction does not contain " + "a valid db connection."); 390 } 391 392 stmt = prepareErrorStatement(error, conn); 393 394 int rowcount = stmt.executeUpdate(); 395 if (rowcount > 0) { 396 result = true; 397 } 398 return result; 399 } 400 catch (SQLException e) { 401 log.error( 402 "JDBCOutputAdapter.persistErrorRecord() " 403 + "SQLException thrown. e = " 404 + e, 405 e); 406 log.error("record = " + error); 407 throw new ProcessingException( 408 "JDBCOutputAdapter.persistErrorRecord() " 409 + "SQLException thrown. e = " 410 + e, 411 e); 412 } 413 finally { 414 // connection is managed by the Transaction & should not be closed 415 // within the handleValidRecords()/handleErrorRecords() methods. 416 // ... but close the statement. 417 if (stmt != null) { 418 try { 419 stmt.close(); 420 } 421 catch (SQLException e) { 422 log.error("sqlexception closing db statement.", e); 423 } 424 } 425 } 426 } 427 428 /*** 429 * Get SQL, prepare SQL statement, and set all the error fields. 430 * 431 * @param error 432 * @param conn 433 * @return PreparedStatement 434 * @throws SQLException 435 * @throws ProcessingException 436 * 437 * @since 1.0 438 */ 439 protected PreparedStatement prepareErrorStatement( 440 Error error, 441 Connection conn) 442 throws SQLException, ProcessingException { 443 444 String sql = getPersistErrorSQL(); 445 if (sql == null) { 446 log().error( 447 "JDBCOutputAdapter.prepareErrorStatement(): " 448 + "SQL for persisting error record missing."); 449 throw new ProcessingException("No SQL provided to persist error record."); 450 } 451 452 PreparedStatement stmt = conn.prepareStatement(sql); 453 454 setErrorStatementFields(error, stmt); 455 456 return stmt; 457 } 458 459 /*** 460 * Set the appropriate fields on the prepared statement 461 * Not abstract so that sub-classes that don't use prepareErrorStatement 462 * need not implement this w/ a no-op, but if anyone who _should_ implement 463 * it doesn't, it will "fail fast" & let them know immediately. 464 * 465 * @param error 466 * @param stmt 467 * @throws SQLException 468 * @throws ProcessingException 469 * 470 * @since 1.0 471 */ 472 protected void setErrorStatementFields( 473 Error error, 474 PreparedStatement stmt) 475 throws SQLException, ProcessingException { 476 throw new ProcessingException( 477 "setErrorStatementFields() called, " 478 + "but not implemented on JDBCOutputAdapter."); 479 } 480 481 /*** 482 * Return the SQL for inserting or updating an error record. 483 * 484 * @return String 485 * 486 * @since 1.0 487 */ 488 protected String getPersistErrorSQL() { 489 return this.PERSIST_ERROR_RECORD_SQL; 490 } 491 492 /*** 493 * Mark the source record complete. 494 * 495 * @param record the destination record. 496 * @param t 497 * @return boolean 498 * @throws ProcessingException 499 * 500 * @since 1.0 501 */ 502 protected boolean updateSourceForValidRecord( 503 Record record, 504 Transaction t) 505 throws ProcessingException { 506 PreparedStatement stmt = null; 507 boolean result = false; 508 try { 509 Connection conn = ((DatabaseTransaction) t).getConnection(); 510 if (conn == null) { 511 log.error("connection error."); 512 throw new ProcessingException( 513 "transaction does not contain " + "valid db connection."); 514 } 515 516 stmt = prepareStatementForValidSourceRecord(record, conn); 517 518 int rowcount = stmt.executeUpdate(); 519 if (rowcount > 0) { 520 result = true; 521 } 522 return result; 523 } 524 catch (SQLException e) { 525 log().error( 526 "JDBCOutputAdapter.updateSourceForValidRecord() " 527 + "SQLException thrown. e = " 528 + e, 529 e); 530 log().error("record = " + record); 531 throw new ProcessingException( 532 "JDBCOutputAdapter.updateSourceForValidRecord() " 533 + "SQLException thrown. e = " 534 + e, 535 e); 536 } 537 finally { 538 // connection is managed by the Transaction & should not be closed 539 // within the handleValidRecords()/handleErrorRecords() methods. 540 // ... but close the statement. 541 if (stmt != null) { 542 try { 543 stmt.close(); 544 } 545 catch (SQLException e) { 546 log.error("sqlexception closing db statement.", e); 547 } 548 } // if 549 } // finally 550 } 551 552 /*** 553 * Get SQL, prepare SQL statement, and set all the error fields. 554 * 555 * @param record 556 * @param conn 557 * @return PreparedStatement 558 * @throws SQLException 559 * @throws ProcessingException 560 * 561 * @since 1.0 562 */ 563 protected PreparedStatement prepareStatementForValidSourceRecord( 564 Record record, 565 Connection conn) 566 throws SQLException, ProcessingException { 567 568 String sql = getUpdateSQLForValidSourceRecord(); 569 if (sql == null) { 570 log().error( 571 "JDBCOutputAdapter.prepareStatementForValidSourceRecord(): " 572 + "SQL for updating valid record missing."); 573 throw new ProcessingException("No SQL provided to update valid record."); 574 } 575 576 PreparedStatement stmt = conn.prepareStatement(sql); 577 578 setStatementForValidSourceRecord(record, stmt); 579 580 return stmt; 581 } 582 583 /*** 584 * Set the appropriate fields on the prepared statement 585 * Not abstract so that sub-classes that don't use prepareErrorStatement 586 * need not implement this w/ a no-op, but if anyone who _should_ implement 587 * it doesn't, it will "fail fast" & let them know immediately. 588 * 589 * @param record 590 * @param stmt 591 * @throws SQLException 592 * @throws ProcessingException 593 * 594 * @since 1.0 595 */ 596 protected void setStatementForValidSourceRecord( 597 Record record, 598 PreparedStatement stmt) 599 throws SQLException, ProcessingException { 600 throw new ProcessingException( 601 "setStatementForValidSourceRecord() called, " 602 + "but not implemented on JDBCOutputAdapter."); 603 } 604 605 /*** 606 * Return the SQL for updating the source record corresponding to 607 * the given output record. 608 * 609 * @return String 610 * 611 * @since 1.0 612 */ 613 protected String getUpdateSQLForValidSourceRecord() { 614 return this.UPDATE_SOURCE_VALID_SQL; 615 } 616 617 /*** 618 * Mark the source record complete. 619 * 620 * @param record the destination record. 621 * @param t 622 * @return boolean 623 * @throws ProcessingException 624 * 625 * @since 1.0 626 */ 627 protected boolean updateSourceForErrorRecord( 628 Record record, 629 Transaction t) 630 throws ProcessingException { 631 PreparedStatement stmt = null; 632 boolean result = false; 633 try { 634 Connection conn = ((DatabaseTransaction) t).getConnection(); 635 if (conn == null) { 636 log.error("connection error."); 637 throw new ProcessingException( 638 "transaction does not contain " + "valid db connection."); 639 } 640 641 stmt = prepareStatementForErrorSourceRecord(record, conn); 642 643 int rowcount = stmt.executeUpdate(); 644 if (rowcount > 0) { 645 /* Where this is a recoverable error for valid records, 646 * it is considered fatal for error records because 647 * there is no reasonable way of handling it. 648 */ 649 result = true; 650 } 651 return result; 652 } 653 catch (SQLException e) { 654 log().error( 655 "JDBCOutputAdapter.updateSourceForErrorRecord() " 656 + "SQLException thrown. e = " 657 + e, 658 e); 659 log().error("record = " + record); 660 throw new ProcessingException( 661 "JDBCOutputAdapter.updateSourceForErrorRecord() " 662 + "SQLException thrown. e = " 663 + e, 664 e); 665 } 666 finally { 667 // connection is managed by the Transaction & should not be closed 668 // within the handleValidRecords()/handleErrorRecords() methods. 669 // ... but close the statement. 670 if (stmt != null) { 671 try { 672 stmt.close(); 673 } 674 catch (SQLException e) { 675 log.error("sqlexception closing db statement.", e); 676 } 677 } // if 678 } // finally 679 } 680 681 /*** 682 * Get SQL, prepare SQL statement, and set all the error fields. 683 * 684 * @param record 685 * @param conn 686 * @return PreparedStatement 687 * @throws SQLException 688 * @throws ProcessingException 689 * 690 * @since 1.0 691 */ 692 protected PreparedStatement prepareStatementForErrorSourceRecord( 693 Record record, 694 Connection conn) 695 throws SQLException, ProcessingException { 696 697 String sql = getUpdateSQLForErrorSourceRecord(); 698 if (sql == null) { 699 log().error( 700 "JDBCOutputAdapter.prepareStatementForErrorSourceRecord(): " 701 + "SQL for updating error record missing."); 702 throw new ProcessingException("No SQL provided to update error record."); 703 } 704 705 PreparedStatement stmt = conn.prepareStatement(sql); 706 707 setStatementForErrorSourceRecord(record, stmt); 708 709 return stmt; 710 } 711 712 /*** 713 * Set the appropriate fields on the prepared statement 714 * Not abstract so that sub-classes that don't use prepareErrorStatement 715 * need not implement this w/ a no-op, but if anyone who _should_ implement 716 * it doesn't, it will "fail fast" & let them know immediately. 717 * 718 * @param record 719 * @param stmt 720 * @throws SQLException 721 * @throws ProcessingException 722 * 723 * @since 1.0 724 */ 725 protected void setStatementForErrorSourceRecord( 726 Record record, 727 PreparedStatement stmt) 728 throws SQLException, ProcessingException { 729 throw new ProcessingException( 730 "setStatementForValidSourceRecord() " 731 + "called, but not implemented on JDBCOutputAdapter."); 732 } 733 734 /*** 735 * Return the SQL for updating the source record corresponding to 736 * the given output record. 737 * 738 * @return String 739 * 740 * @since 1.0 741 */ 742 protected String getUpdateSQLForErrorSourceRecord() { 743 return this.UPDATE_SOURCE_ERROR_SQL; 744 } 745 746 /*** 747 * Add error to valid record for a problem during output adapter processing. 748 * This method can be overridden to correctly add the program specific 749 * error to the record. By default, it does nothing. 750 * 751 * @param r 752 * 753 * @since 1.1 754 */ 755 protected void addErrorToRecord(Record r) { 756 // no op - override in concrete interface output adapter. 757 } 758 759 }

This page was automatically generated by Maven