View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc;
48
49 import com.rhi.architecture.batch.AuditAgent;
50 import com.rhi.architecture.batch.AuditException;
51 import com.rhi.architecture.logging.LogUtil;
52 import com.rhi.architecture.logging.Logger;
53 import com.rhi.architecture.util.*;
54 import com.rhi.architecture.threads.ThreadPool;
55
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.Properties;
59
60
61 /***
62 * The OutputAdapter is responsible for taking a completed
63 * work set and storing it. File or DB persistence
64 * mechanisms will be the only initially supported output
65 * adapter types.
66 *
67 * @author Pete McKinstry
68 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
69 *
70 * @since 1.0
71 */
72 public abstract class AbstractOutputAdapter
73 implements OutputAdapter, Monitor {
74
75 private static Logger log = null;
76
77 private static final String MAX_CNT_TAG = "oa_max_rec_count";
78 private static final String DEFAULT_RECORD_COUNT = "500";
79 private static final String MAX_THREAD_CNT_TAG = "oa_max_threads";
80 private static final String DEFAULT_THREAD_COUNT = "1";
81
82 private AuditAgent auditAgent;
83 private String txTypeName;
84
85 // number of records to persist @ a time.
86 private int COMMIT_SIZE;
87
88 // Thread pool for the output adapter
89 private ThreadPool myThreadPool = null;
90 // maximum number of threads in the thread pool
91 private int MAX_THREADS;
92
93 private Channel outputChannel;
94 private Channel errorChannel;
95 private ExceptionHandler handler;
96
97 private static final String MAX_SLEEP_TAG =
98 "AbstractOutputAdapter.MaxSleep";
99 private static final String DEFAULT_MAX_SLEEP = "2500"; // 2.5 seconds
100 private int sleepTime;
101
102 private volatile boolean done;
103
104
105 /***
106 * Default constructor.
107 *
108 * @since 1.0
109 */
110 public AbstractOutputAdapter() {
111 }
112
113
114 /***
115 * Set method to tell the OutputAdapter what audit
116 * agent to use. The output Adapter is responsible for
117 * setting the audit counts when it persists the records.
118 *
119 * @param AuditAgent
120 *
121 * @since 1.0
122 */
123 public final void setAuditAgent(AuditAgent agent) {
124 this.auditAgent = agent;
125 }
126 /***
127 * get audit agent
128 * @return AuditAgent
129 *
130 * @since 1.0
131 */
132 public final AuditAgent getAuditAgent() {
133 return auditAgent;
134 }
135
136
137 /***
138 * Load error logfile & create writer.
139 *
140 * @since 1.0
141 */
142 public void init( Properties props )
143 throws InitializationException {
144 try {
145 log = LogUtil.getLogger();
146 }
147 catch (ConfigurationException e) {
148 throw new InitializationException(e.toString(), e);
149 }
150
151 log.debug("init");
152 try {
153 String rec_count = props.getProperty(MAX_CNT_TAG,
154 DEFAULT_RECORD_COUNT);
155 COMMIT_SIZE = Integer.parseInt( rec_count );
156 // transaction type
157 txTypeName = props.getProperty( Transaction.TYPE_KEY );
158 if (txTypeName == null) {
159 throw new InitializationException( Transaction.TYPE_KEY +
160 " missing from configuration
161 properties.");
162 }
163 }
164 catch ( NumberFormatException nfe ) {
165 throw new InitializationException(
166 "Invalid number for config value = " + MAX_CNT_TAG, nfe );
167 }
168
169 // Get maximum number of threads for the output adapter
170 try {
171 String thread_count = props.getProperty(MAX_THREAD_CNT_TAG,
172 DEFAULT_THREAD_COUNT);
173 MAX_THREADS = Integer.parseInt( thread_count );
174 }
175 catch ( NumberFormatException nfe ) {
176 throw new InitializationException(
177 "Invalid number for config value = " + MAX_THREAD_CNT_TAG, nfe );
178 }
179
180 try {
181 String sleepValue = props.getProperty(MAX_SLEEP_TAG,
182 DEFAULT_MAX_SLEEP);
183 this.sleepTime = Integer.parseInt( sleepValue );
184 }
185 catch (NumberFormatException e) {
186 throw new InitializationException(
187 "Invalid number for config value = " + MAX_SLEEP_TAG, e );
188 }
189
190 // If the object implements SingleThreadModel then
191 // default max threads to 1
192 if ( this instanceof SingleThreadModel ) {
193 MAX_THREADS = 1;
194 }
195 myThreadPool = new ThreadPool(1,MAX_THREADS);
196 }
197
198
199 /***
200 * cleanup() is called by the BatchApplication during shutdown.
201 * It closes the errorLog writer and threadpool.
202 *
203 * @since 1.0
204 */
205 public void cleanup() {
206 if (log!=null) {
207 log.debug("running cleanup");
208 }
209
210 if (myThreadPool != null) {
211 myThreadPool.close();
212 }
213 }
214
215
216 /***
217 * Do any required processing prior to completing the batch
218 * cycle. The flush() method is called by the strategy for
219 * each execution cycle. This differs from the cleanup
220 * method, which is called only once upon application
221 * shutdown.
222 * <p/>
223 * flush() is called by the run() method during each cycle.
224 *
225 * @throws ProcessingException Signals a fatal error. The
226 * application will shutdown if this exception is thrown.
227 *
228 * @since 1.0
229 */
230 public void flush() throws ProcessingException {
231 try {
232 auditAgent.commitAudits();
233 }
234 catch ( AuditException ae ) {
235 log.error("AuditException thrown by commitAudits()", ae);
236 throw new ProcessingException( ae.toString(), ae );
237 }
238 }
239
240 /***
241 * @see java.lang.Runnable#run()
242 */
243 public void run() {
244 try {
245 // OA should be notified if records are dumped in either channel.
246 this.outputChannel.registerMonitor(this);
247 this.errorChannel.registerMonitor(this);
248 }
249 catch (Throwable t) {
250 log().fatal("Throwable caught in AbstractOutputAdapter." +
251 "e = " + t, t);
252 this.handler.reportException(new ProcessingException(t));
253 t.printStackTrace();
254 return;
255 }
256
257 // 'localDone' variable is used to ensure that write() is
258 // called once after the this.done variable is set to true.
259 // This will force all the records in the output & error
260 // channels to be flushed prior to exit. Otherwise you have
261 // a race condition where records could be added to the
262 // channels after write() is called, but before the loop
263 // resets.
264 boolean localDone;
265 do {
266 localDone = this.done;
267 try {
268 write(this.outputChannel, this.errorChannel);
269 flush();
270
271 // if pipeline not complete, wait for notification of more records.
272 if (!this.done) {
273 try {
274 log.trace("Empty cycle detected, calling wait()");
275 synchronized (this) {
276 wait();
277 }
278 }
279 catch (InterruptedException e) {
280 // ignore
281 }
282 }
283 }
284 catch (ProcessingException e) {
285 log().fatal("ProcessingException thrown in OutputAdapter. " +
286 "e = " + e, e);
287 this.handler.reportException(e);
288 e.printStackTrace();
289 }
290 catch (Throwable t) {
291 log().fatal("Throwable caught in AbstractOutputAdapter." +
292 "e = " + t, t);
293 this.handler.reportException(new ProcessingException(t));
294 t.printStackTrace();
295 }
296 }
297 while ((!localDone) && (handler.hasError()==false));
298 }
299
300 /***
301 * Persist valid and errored records to the appropriate data
302 * store. Depending on the concrete interface, the output
303 * channel may or may not have errored records in it.
304 * Output adapters should process the valid records first so
305 * that any errors can be added to the error collection.
306 *
307 * @param Channel holding bin for valid records waiting to be
308 * persisted.
309 * @param Channel holding bin for errored records
310 * waiting to be persisted.
311 *
312 * @exception ProcessingException error occured
313 * during write. Signals a fatal error. The application
314 * will shutdown if this exception is thrown.
315 *
316 * @since 1.0
317 */
318 public void write( Channel ch, Channel err )
319 throws
320 Processin
321 gExceptio
322 n {
323
324 //1. write valid records
325 writeValidRecords(ch);
326
327 //2. write error records
328 writeErrorRecords(err);
329
330 // 3. do end of batch processing ( non record level )
331 Transaction transaction = createTransaction();;
332 try {
333 transaction.begin();
334 completeBatch( transaction );
335 transaction.commit();
336 }
337 catch (ProcessingException e) {
338 log.error("ProcessingException", e);
339 try {
340 transaction.rollback();
341 }
342 catch ( TransactionException te ) {
343 // ignore
344 }
345 throw e;
346 }
347 catch (TransactionException e) {
348 log.error("TransactionException", e);
349 try {
350 transaction.rollback();
351 }
352 catch ( TransactionException te ) {
353 // ignore
354 }
355 throw new ProcessingException( e.toString(), e );
356 }
357 }
358
359 /***
360 *
361 * To Improve Performance, a thread pool is created and used.
362 * From valid channel, oa_max_rec_count number of records are
363 * retrieved and handed over to each thread. At any point of time
364 * at maximum oa_max_threads number of thread will be running.
365 * Once, all threads are handling work, main thread waits for all of
366 * them to finish and then only processes next set of records.
367 *
368 * @param Channel valid records.
369 *
370 * @since 1.0
371 */
372 public void writeValidRecords(Channel ch)
373 throws ProcessingException {
374
375 int vSize = 0;
376 int threadCount = 0;
377 ExceptionHandler handler = new ExceptionHandler();
378 ValidRecordsTask prototype =
379 new ValidRecordsTask(this, MAX_THREADS, handler);
380
381 do {
382 Collection valid = null;
383 valid = extractValidRecords( ch, COMMIT_SIZE );
384 vSize = valid.size();
385 if ( vSize > 0 ) {
386 log.info("Thread Id - "+threadCount+
387 ":Processing a batch of "+ vSize + " valid records.");
388 ValidRecordsTask validTask =
389 (ValidRecordsTask) prototype.clone();
390 validTask.setRecords(valid);
391 myThreadPool.execute(validTask);
392 threadCount++;
393 if ( threadCount == MAX_THREADS ) {
394 log.debug("Valid: Waiting for join");
395 try {
396 myThreadPool.join();
397 }
398 catch (InterruptedException e) {
399
400 throw new ProcessingException( e.toString(), e );
401
402 }
403 log.debug("Valid: Join finished");
404 threadCount = 0;
405 if (handler.hasError()) {
406 handler.rethrowException();
407 }
408 }
409 }
410 } while ( vSize > 0 );
411
412 if ( threadCount > 0 ) {
413 log.debug("AfterValid: waiting for threads to join.");
414 try {
415 myThreadPool.join();
416 }
417 catch (InterruptedException e) {
418 throw new
419 ProcessingException( e.toString(), e );
420 }
421
422 log.debug("AfterValid: Finished waiting for threads to join.");
423 if (handler.hasError()) {
424 handler.rethrowException();
425 }
426 }
427 else {
428 log.debug("No threads waiting to be joined.");
429 }
430 }
431
432 /***
433 *
434 * To Improve Performance, a thread pool is created and used.
435 * From error channel, oa_max_rec_count number of records are
436 * retrieved and handed over to each thread. At any point of time
437 * at maximin oa_max_threads number of thread will be running.
438 * Once, all threads are handling work, main thread waits for all of
439 * them to finish and then only processes next set of records.
440 *
441 * @param Channel error records.
442 *
443 * @since 1.0
444 */
445 public void writeErrorRecords(Channel err)
446 throws ProcessingException {
447 int eSize = 0;
448 int threadCount = 0;
449 ExceptionHandler handler = new ExceptionHandler();
450 ErrorRecordsTask prototype =
451 new ErrorRecordsTask(this, MAX_THREADS, handler);
452
453 do {
454 Collection errors = null;
455 errors = extractErrorRecords( err, COMMIT_SIZE );
456 eSize = errors.size();
457 if ( eSize > 0 ) {
458 log.info("Thread Id - "+threadCount+
459 ":Processing a batch of "+ eSize + " error records.");
460 ErrorRecordsTask errorTask =
461 (ErrorRecordsTask) prototype.clone();
462 errorTask.setRecords(errors);
463 myThreadPool.execute(errorTask);
464 threadCount++;
465 if ( threadCount == MAX_THREADS ) {
466 log.debug("Error: Waiting for join");
467 try {
468 myThreadPool.join();
469 }
470 catch (InterruptedException e) {
471
472 throw new ProcessingException( e.toString(), e );
473
474 }
475 log.debug("Error: Join finished");
476 if (handler.hasError()) {
477 handler.rethrowException();
478 }
479 threadCount = 0;
480 }
481 }
482 } while (eSize > 0 );
483
484 if ( threadCount > 0 ) {
485 log.debug("AfterError: waiting for threads to join.");
486 try {
487 myThreadPool.join();
488 }
489 catch (InterruptedException e) {
490 throw new
491 ProcessingException( e.toString(), e );
492 }
493
494 log.debug("AfterError: Finished waiting for threads to join.");
495 if (handler.hasError()) {
496 handler.rethrowException();
497 }
498 }
499 else {
500 log.debug("No threads waiting to be joined.");
501 }
502 }
503
504 /***
505 * Helper method for converting the error channel into a collection
506 *
507 * @param Channel errored records.
508 * @return Collection errored records as a collection.
509 *
510 * @since 1.0
511 */
512 private Collection extractValidRecords( Channel ch, int batchSize )
513 throws
514 Processin
515 gExceptio
516 n {
517 Collection records = new ArrayList();
518 if ( ch == null ) {
519 log.error("valid channel null. ");
520 throw new ProcessingException("Valid channel null in " +
521 "AbstractOutputAdapter");
522 }
523
524 Collection c = ch.pull( batchSize );
525 records.addAll( c );
526 log.debug( "found " + records.size() + " records in valid channel");
527
528 return records;
529 }
530
531
532 /***
533 * Defer handling of valid records to sub-classes..
534 *
535 * @param Collection valid records.
536 * @param Transaction transaction under which these records should
537 * be committed.
538 * @return Collection
539 * @throws ProcessingException
540 *
541 * @since 1.0
542 */
543 protected abstract Collection handleValidRecords( Collection c,
544 Transaction t )
545 throws
546 Processin
547 gExceptio
548 n;
549
550
551 /***
552 * Helper method for converting the error channel into a collection
553 *
554 * @param Channel errored records.
555 * @return Collection errored records as a collection.
556 *
557 * @since 1.0
558 */
559 private Collection extractErrorRecords( Channel err, int batchSize )
560 throws
561 Processin
562 gExceptio
563 n {
564 Collection errors = new ArrayList();
565 if ( err == null ) {
566 log.error("error channel null. ");
567 throw new ProcessingException("Error channel null in " +
568 "AbstractOutputAdapter");
569 }
570
571 Collection c = err.pull( batchSize );
572 errors.addAll( c );
573
574 log.debug( "found " + errors.size() + " records in error channel");
575
576 return errors;
577 }
578
579
580 /***
581 * Defer handling of valid records to sub-classes..
582 *
583 * @param Collection valid records.
584 * @param Transaction transaction under which these records should
585 * be committed.
586 * @return Collection
587 * @throws ProcessingException
588 *
589 * @since 1.0
590 */
591 protected abstract void handleErrorRecords( Collection c, Transaction t )
592 throws ProcessingException;
593
594 /***
595 * Do any non-record level processing required to finish this
596 * batch cycle. An example of this is incrementing the file
597 * sequence number after each batch (write()) is finished.
598 * <p/>
599 * Since this type of processing doesn't depend on records,
600 * it should be called outside of the handle* methods.
601 *
602 * @param transaction
603 * @throws ProcessingException
604 *
605 * @since 1.1
606 */
607 protected void completeBatch( Transaction transaction )
608 throws
609 ProcessingException {
610 // optional method. no-op in base class.
611 }
612
613 /***
614 * No op by default.
615 *
616 * @see com.rhi.architecture.parc.OutputAdapter#close()
617 *
618 * @since 1.1
619 */
620 public void close() throws ProcessingException {
621 // no op. If you need it, override it.
622 }
623
624 /***
625 * @see com.rhi.architecture.parc.OutputAdapter#setErrorChannel(Channel)
626 */
627 public void setErrorChannel(Channel ch) {
628 this.errorChannel = ch;
629 }
630
631 /***
632 * @see com.rhi.architecture.parc.OutputAdapter#setValidChannel(Channel)
633 */
634 public void setValidChannel(Channel ch) {
635 this.outputChannel = ch;
636 }
637
638 /***
639 * @see com.rhi.architecture.parc.OutputAdapter#setExceptionHandler(ExceptionHandler)
640 */
641 public void setExceptionHandler(ExceptionHandler h) {
642 this.handler = h;
643 }
644
645 /***
646 * @see com.rhi.architecture.parc.OutputAdapter#markComplete()
647 */
648 public void markComplete() {
649 this.done = true;
650
651 synchronized(this) {
652 notifyAll();
653 }
654 }
655
656 /***
657 * Simple implementation of Monitor interface based on Thread
658 * wait/notify mechanism.
659 *
660 * @see com.rhi.architecture.parc.Monitor#notify(Event)
661 */
662 public void notify(Event e) {
663 log.trace("Notification received. " + e);
664 synchronized(this) {
665 notifyAll();
666 }
667 }
668
669 /***
670 * Create a transaction of the appropriate type.
671 *
672 * @return Transaction
673 *
674 * @since 1.0
675 */
676 public Transaction createTransaction() throws ProcessingException {
677 try {
678 Class txClass = Class.forName(txTypeName);
679 Transaction tx = (Transaction)txClass.newInstance();
680 return tx;
681 }
682 catch (ClassNotFoundException cnfe) {
683 throw new ProcessingException("ClassNotFoundException", cnfe);
684 }
685 catch (InstantiationException ie) {
686 throw new ProcessingException("InstantiationException", ie);
687 }
688 catch (IllegalAccessException iae) {
689 throw new ProcessingException("IllegalAccessException", iae);
690 }
691 }
692
693 /***
694 * Returns the Logger
695 *
696 * @return Logger
697 */
698 public Logger log() {
699 return log;
700 }
701
702 }
703
704
This page was automatically generated by Maven