View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc; 48 49 import com.rhi.architecture.batch.AuditAgent; 50 import com.rhi.architecture.batch.AuditException; 51 import com.rhi.architecture.logging.LogUtil; 52 import com.rhi.architecture.logging.Logger; 53 import com.rhi.architecture.util.*; 54 import com.rhi.architecture.threads.ThreadPool; 55 56 import java.util.ArrayList; 57 import java.util.Collection; 58 import java.util.Properties; 59 60 61 /*** 62 * The OutputAdapter is responsible for taking a completed 63 * work set and storing it. File or DB persistence 64 * mechanisms will be the only initially supported output 65 * adapter types. 66 * 67 * @author Pete McKinstry 68 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 69 * 70 * @since 1.0 71 */ 72 public abstract class AbstractOutputAdapter 73 implements OutputAdapter, Monitor { 74 75 private static Logger log = null; 76 77 private static final String MAX_CNT_TAG = "oa_max_rec_count"; 78 private static final String DEFAULT_RECORD_COUNT = "500"; 79 private static final String MAX_THREAD_CNT_TAG = "oa_max_threads"; 80 private static final String DEFAULT_THREAD_COUNT = "1"; 81 82 private AuditAgent auditAgent; 83 private String txTypeName; 84 85 // number of records to persist @ a time. 86 private int COMMIT_SIZE; 87 88 // Thread pool for the output adapter 89 private ThreadPool myThreadPool = null; 90 // maximum number of threads in the thread pool 91 private int MAX_THREADS; 92 93 private Channel outputChannel; 94 private Channel errorChannel; 95 private ExceptionHandler handler; 96 97 private static final String MAX_SLEEP_TAG = 98 "AbstractOutputAdapter.MaxSleep"; 99 private static final String DEFAULT_MAX_SLEEP = "2500"; // 2.5 seconds 100 private int sleepTime; 101 102 private volatile boolean done; 103 104 105 /*** 106 * Default constructor. 107 * 108 * @since 1.0 109 */ 110 public AbstractOutputAdapter() { 111 } 112 113 114 /*** 115 * Set method to tell the OutputAdapter what audit 116 * agent to use. The output Adapter is responsible for 117 * setting the audit counts when it persists the records. 118 * 119 * @param AuditAgent 120 * 121 * @since 1.0 122 */ 123 public final void setAuditAgent(AuditAgent agent) { 124 this.auditAgent = agent; 125 } 126 /*** 127 * get audit agent 128 * @return AuditAgent 129 * 130 * @since 1.0 131 */ 132 public final AuditAgent getAuditAgent() { 133 return auditAgent; 134 } 135 136 137 /*** 138 * Load error logfile & create writer. 139 * 140 * @since 1.0 141 */ 142 public void init( Properties props ) 143 throws InitializationException { 144 try { 145 log = LogUtil.getLogger(); 146 } 147 catch (ConfigurationException e) { 148 throw new InitializationException(e.toString(), e); 149 } 150 151 log.debug("init"); 152 try { 153 String rec_count = props.getProperty(MAX_CNT_TAG, 154 DEFAULT_RECORD_COUNT); 155 COMMIT_SIZE = Integer.parseInt( rec_count ); 156 // transaction type 157 txTypeName = props.getProperty( Transaction.TYPE_KEY ); 158 if (txTypeName == null) { 159 throw new InitializationException( Transaction.TYPE_KEY + 160 " missing from configuration 161 properties."); 162 } 163 } 164 catch ( NumberFormatException nfe ) { 165 throw new InitializationException( 166 "Invalid number for config value = " + MAX_CNT_TAG, nfe ); 167 } 168 169 // Get maximum number of threads for the output adapter 170 try { 171 String thread_count = props.getProperty(MAX_THREAD_CNT_TAG, 172 DEFAULT_THREAD_COUNT); 173 MAX_THREADS = Integer.parseInt( thread_count ); 174 } 175 catch ( NumberFormatException nfe ) { 176 throw new InitializationException( 177 "Invalid number for config value = " + MAX_THREAD_CNT_TAG, nfe ); 178 } 179 180 try { 181 String sleepValue = props.getProperty(MAX_SLEEP_TAG, 182 DEFAULT_MAX_SLEEP); 183 this.sleepTime = Integer.parseInt( sleepValue ); 184 } 185 catch (NumberFormatException e) { 186 throw new InitializationException( 187 "Invalid number for config value = " + MAX_SLEEP_TAG, e ); 188 } 189 190 // If the object implements SingleThreadModel then 191 // default max threads to 1 192 if ( this instanceof SingleThreadModel ) { 193 MAX_THREADS = 1; 194 } 195 myThreadPool = new ThreadPool(1,MAX_THREADS); 196 } 197 198 199 /*** 200 * cleanup() is called by the BatchApplication during shutdown. 201 * It closes the errorLog writer and threadpool. 202 * 203 * @since 1.0 204 */ 205 public void cleanup() { 206 if (log!=null) { 207 log.debug("running cleanup"); 208 } 209 210 if (myThreadPool != null) { 211 myThreadPool.close(); 212 } 213 } 214 215 216 /*** 217 * Do any required processing prior to completing the batch 218 * cycle. The flush() method is called by the strategy for 219 * each execution cycle. This differs from the cleanup 220 * method, which is called only once upon application 221 * shutdown. 222 * <p/> 223 * flush() is called by the run() method during each cycle. 224 * 225 * @throws ProcessingException Signals a fatal error. The 226 * application will shutdown if this exception is thrown. 227 * 228 * @since 1.0 229 */ 230 public void flush() throws ProcessingException { 231 try { 232 auditAgent.commitAudits(); 233 } 234 catch ( AuditException ae ) { 235 log.error("AuditException thrown by commitAudits()", ae); 236 throw new ProcessingException( ae.toString(), ae ); 237 } 238 } 239 240 /*** 241 * @see java.lang.Runnable#run() 242 */ 243 public void run() { 244 try { 245 // OA should be notified if records are dumped in either channel. 246 this.outputChannel.registerMonitor(this); 247 this.errorChannel.registerMonitor(this); 248 } 249 catch (Throwable t) { 250 log().fatal("Throwable caught in AbstractOutputAdapter." + 251 "e = " + t, t); 252 this.handler.reportException(new ProcessingException(t)); 253 t.printStackTrace(); 254 return; 255 } 256 257 // 'localDone' variable is used to ensure that write() is 258 // called once after the this.done variable is set to true. 259 // This will force all the records in the output & error 260 // channels to be flushed prior to exit. Otherwise you have 261 // a race condition where records could be added to the 262 // channels after write() is called, but before the loop 263 // resets. 264 boolean localDone; 265 do { 266 localDone = this.done; 267 try { 268 write(this.outputChannel, this.errorChannel); 269 flush(); 270 271 // if pipeline not complete, wait for notification of more records. 272 if (!this.done) { 273 try { 274 log.trace("Empty cycle detected, calling wait()"); 275 synchronized (this) { 276 wait(); 277 } 278 } 279 catch (InterruptedException e) { 280 // ignore 281 } 282 } 283 } 284 catch (ProcessingException e) { 285 log().fatal("ProcessingException thrown in OutputAdapter. " + 286 "e = " + e, e); 287 this.handler.reportException(e); 288 e.printStackTrace(); 289 } 290 catch (Throwable t) { 291 log().fatal("Throwable caught in AbstractOutputAdapter." + 292 "e = " + t, t); 293 this.handler.reportException(new ProcessingException(t)); 294 t.printStackTrace(); 295 } 296 } 297 while ((!localDone) && (handler.hasError()==false)); 298 } 299 300 /*** 301 * Persist valid and errored records to the appropriate data 302 * store. Depending on the concrete interface, the output 303 * channel may or may not have errored records in it. 304 * Output adapters should process the valid records first so 305 * that any errors can be added to the error collection. 306 * 307 * @param Channel holding bin for valid records waiting to be 308 * persisted. 309 * @param Channel holding bin for errored records 310 * waiting to be persisted. 311 * 312 * @exception ProcessingException error occured 313 * during write. Signals a fatal error. The application 314 * will shutdown if this exception is thrown. 315 * 316 * @since 1.0 317 */ 318 public void write( Channel ch, Channel err ) 319 throws 320 Processin 321 gExceptio 322 n { 323 324 //1. write valid records 325 writeValidRecords(ch); 326 327 //2. write error records 328 writeErrorRecords(err); 329 330 // 3. do end of batch processing ( non record level ) 331 Transaction transaction = createTransaction();; 332 try { 333 transaction.begin(); 334 completeBatch( transaction ); 335 transaction.commit(); 336 } 337 catch (ProcessingException e) { 338 log.error("ProcessingException", e); 339 try { 340 transaction.rollback(); 341 } 342 catch ( TransactionException te ) { 343 // ignore 344 } 345 throw e; 346 } 347 catch (TransactionException e) { 348 log.error("TransactionException", e); 349 try { 350 transaction.rollback(); 351 } 352 catch ( TransactionException te ) { 353 // ignore 354 } 355 throw new ProcessingException( e.toString(), e ); 356 } 357 } 358 359 /*** 360 * 361 * To Improve Performance, a thread pool is created and used. 362 * From valid channel, oa_max_rec_count number of records are 363 * retrieved and handed over to each thread. At any point of time 364 * at maximum oa_max_threads number of thread will be running. 365 * Once, all threads are handling work, main thread waits for all of 366 * them to finish and then only processes next set of records. 367 * 368 * @param Channel valid records. 369 * 370 * @since 1.0 371 */ 372 public void writeValidRecords(Channel ch) 373 throws ProcessingException { 374 375 int vSize = 0; 376 int threadCount = 0; 377 ExceptionHandler handler = new ExceptionHandler(); 378 ValidRecordsTask prototype = 379 new ValidRecordsTask(this, MAX_THREADS, handler); 380 381 do { 382 Collection valid = null; 383 valid = extractValidRecords( ch, COMMIT_SIZE ); 384 vSize = valid.size(); 385 if ( vSize > 0 ) { 386 log.info("Thread Id - "+threadCount+ 387 ":Processing a batch of "+ vSize + " valid records."); 388 ValidRecordsTask validTask = 389 (ValidRecordsTask) prototype.clone(); 390 validTask.setRecords(valid); 391 myThreadPool.execute(validTask); 392 threadCount++; 393 if ( threadCount == MAX_THREADS ) { 394 log.debug("Valid: Waiting for join"); 395 try { 396 myThreadPool.join(); 397 } 398 catch (InterruptedException e) { 399 400 throw new ProcessingException( e.toString(), e ); 401 402 } 403 log.debug("Valid: Join finished"); 404 threadCount = 0; 405 if (handler.hasError()) { 406 handler.rethrowException(); 407 } 408 } 409 } 410 } while ( vSize > 0 ); 411 412 if ( threadCount > 0 ) { 413 log.debug("AfterValid: waiting for threads to join."); 414 try { 415 myThreadPool.join(); 416 } 417 catch (InterruptedException e) { 418 throw new 419 ProcessingException( e.toString(), e ); 420 } 421 422 log.debug("AfterValid: Finished waiting for threads to join."); 423 if (handler.hasError()) { 424 handler.rethrowException(); 425 } 426 } 427 else { 428 log.debug("No threads waiting to be joined."); 429 } 430 } 431 432 /*** 433 * 434 * To Improve Performance, a thread pool is created and used. 435 * From error channel, oa_max_rec_count number of records are 436 * retrieved and handed over to each thread. At any point of time 437 * at maximin oa_max_threads number of thread will be running. 438 * Once, all threads are handling work, main thread waits for all of 439 * them to finish and then only processes next set of records. 440 * 441 * @param Channel error records. 442 * 443 * @since 1.0 444 */ 445 public void writeErrorRecords(Channel err) 446 throws ProcessingException { 447 int eSize = 0; 448 int threadCount = 0; 449 ExceptionHandler handler = new ExceptionHandler(); 450 ErrorRecordsTask prototype = 451 new ErrorRecordsTask(this, MAX_THREADS, handler); 452 453 do { 454 Collection errors = null; 455 errors = extractErrorRecords( err, COMMIT_SIZE ); 456 eSize = errors.size(); 457 if ( eSize > 0 ) { 458 log.info("Thread Id - "+threadCount+ 459 ":Processing a batch of "+ eSize + " error records."); 460 ErrorRecordsTask errorTask = 461 (ErrorRecordsTask) prototype.clone(); 462 errorTask.setRecords(errors); 463 myThreadPool.execute(errorTask); 464 threadCount++; 465 if ( threadCount == MAX_THREADS ) { 466 log.debug("Error: Waiting for join"); 467 try { 468 myThreadPool.join(); 469 } 470 catch (InterruptedException e) { 471 472 throw new ProcessingException( e.toString(), e ); 473 474 } 475 log.debug("Error: Join finished"); 476 if (handler.hasError()) { 477 handler.rethrowException(); 478 } 479 threadCount = 0; 480 } 481 } 482 } while (eSize > 0 ); 483 484 if ( threadCount > 0 ) { 485 log.debug("AfterError: waiting for threads to join."); 486 try { 487 myThreadPool.join(); 488 } 489 catch (InterruptedException e) { 490 throw new 491 ProcessingException( e.toString(), e ); 492 } 493 494 log.debug("AfterError: Finished waiting for threads to join."); 495 if (handler.hasError()) { 496 handler.rethrowException(); 497 } 498 } 499 else { 500 log.debug("No threads waiting to be joined."); 501 } 502 } 503 504 /*** 505 * Helper method for converting the error channel into a collection 506 * 507 * @param Channel errored records. 508 * @return Collection errored records as a collection. 509 * 510 * @since 1.0 511 */ 512 private Collection extractValidRecords( Channel ch, int batchSize ) 513 throws 514 Processin 515 gExceptio 516 n { 517 Collection records = new ArrayList(); 518 if ( ch == null ) { 519 log.error("valid channel null. "); 520 throw new ProcessingException("Valid channel null in " + 521 "AbstractOutputAdapter"); 522 } 523 524 Collection c = ch.pull( batchSize ); 525 records.addAll( c ); 526 log.debug( "found " + records.size() + " records in valid channel"); 527 528 return records; 529 } 530 531 532 /*** 533 * Defer handling of valid records to sub-classes.. 534 * 535 * @param Collection valid records. 536 * @param Transaction transaction under which these records should 537 * be committed. 538 * @return Collection 539 * @throws ProcessingException 540 * 541 * @since 1.0 542 */ 543 protected abstract Collection handleValidRecords( Collection c, 544 Transaction t ) 545 throws 546 Processin 547 gExceptio 548 n; 549 550 551 /*** 552 * Helper method for converting the error channel into a collection 553 * 554 * @param Channel errored records. 555 * @return Collection errored records as a collection. 556 * 557 * @since 1.0 558 */ 559 private Collection extractErrorRecords( Channel err, int batchSize ) 560 throws 561 Processin 562 gExceptio 563 n { 564 Collection errors = new ArrayList(); 565 if ( err == null ) { 566 log.error("error channel null. "); 567 throw new ProcessingException("Error channel null in " + 568 "AbstractOutputAdapter"); 569 } 570 571 Collection c = err.pull( batchSize ); 572 errors.addAll( c ); 573 574 log.debug( "found " + errors.size() + " records in error channel"); 575 576 return errors; 577 } 578 579 580 /*** 581 * Defer handling of valid records to sub-classes.. 582 * 583 * @param Collection valid records. 584 * @param Transaction transaction under which these records should 585 * be committed. 586 * @return Collection 587 * @throws ProcessingException 588 * 589 * @since 1.0 590 */ 591 protected abstract void handleErrorRecords( Collection c, Transaction t ) 592 throws ProcessingException; 593 594 /*** 595 * Do any non-record level processing required to finish this 596 * batch cycle. An example of this is incrementing the file 597 * sequence number after each batch (write()) is finished. 598 * <p/> 599 * Since this type of processing doesn't depend on records, 600 * it should be called outside of the handle* methods. 601 * 602 * @param transaction 603 * @throws ProcessingException 604 * 605 * @since 1.1 606 */ 607 protected void completeBatch( Transaction transaction ) 608 throws 609 ProcessingException { 610 // optional method. no-op in base class. 611 } 612 613 /*** 614 * No op by default. 615 * 616 * @see com.rhi.architecture.parc.OutputAdapter#close() 617 * 618 * @since 1.1 619 */ 620 public void close() throws ProcessingException { 621 // no op. If you need it, override it. 622 } 623 624 /*** 625 * @see com.rhi.architecture.parc.OutputAdapter#setErrorChannel(Channel) 626 */ 627 public void setErrorChannel(Channel ch) { 628 this.errorChannel = ch; 629 } 630 631 /*** 632 * @see com.rhi.architecture.parc.OutputAdapter#setValidChannel(Channel) 633 */ 634 public void setValidChannel(Channel ch) { 635 this.outputChannel = ch; 636 } 637 638 /*** 639 * @see com.rhi.architecture.parc.OutputAdapter#setExceptionHandler(ExceptionHandler) 640 */ 641 public void setExceptionHandler(ExceptionHandler h) { 642 this.handler = h; 643 } 644 645 /*** 646 * @see com.rhi.architecture.parc.OutputAdapter#markComplete() 647 */ 648 public void markComplete() { 649 this.done = true; 650 651 synchronized(this) { 652 notifyAll(); 653 } 654 } 655 656 /*** 657 * Simple implementation of Monitor interface based on Thread 658 * wait/notify mechanism. 659 * 660 * @see com.rhi.architecture.parc.Monitor#notify(Event) 661 */ 662 public void notify(Event e) { 663 log.trace("Notification received. " + e); 664 synchronized(this) { 665 notifyAll(); 666 } 667 } 668 669 /*** 670 * Create a transaction of the appropriate type. 671 * 672 * @return Transaction 673 * 674 * @since 1.0 675 */ 676 public Transaction createTransaction() throws ProcessingException { 677 try { 678 Class txClass = Class.forName(txTypeName); 679 Transaction tx = (Transaction)txClass.newInstance(); 680 return tx; 681 } 682 catch (ClassNotFoundException cnfe) { 683 throw new ProcessingException("ClassNotFoundException", cnfe); 684 } 685 catch (InstantiationException ie) { 686 throw new ProcessingException("InstantiationException", ie); 687 } 688 catch (IllegalAccessException iae) { 689 throw new ProcessingException("IllegalAccessException", iae); 690 } 691 } 692 693 /*** 694 * Returns the Logger 695 * 696 * @return Logger 697 */ 698 public Logger log() { 699 return log; 700 } 701 702 } 703 704

This page was automatically generated by Maven