View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc.filter; 48 49 import com.rhi.architecture.logging.LogUtil; 50 import com.rhi.architecture.logging.Logger; 51 import com.rhi.architecture.parc.Consumer; 52 import com.rhi.architecture.parc.ExceptionHandler; 53 import com.rhi.architecture.parc.Filter; 54 import com.rhi.architecture.parc.PARCApplication; 55 import com.rhi.architecture.parc.ProcessingException; 56 import com.rhi.architecture.parc.Supplier; 57 import com.rhi.architecture.resource.InitializationException; 58 import com.rhi.architecture.config.ConfigurationException; 59 60 import java.util.Collection; 61 import java.util.Properties; 62 63 /*** 64 * The AbstractFilter provides a partially implemented 65 * Filter allowing simpler concrete Filter objects. It 66 * deals w/ thread safe exit logic, and idle cycle 67 * detection as well as providing the required hooks for 68 * pre & post channel hookup. All concrete filter 69 * processing is deferred to a doWork() abstract method. 70 * <p/> 71 * Note: a simple run() method is provided to support 72 * the Runnable Interface. 73 * 74 * @author Pete McKinstry 75 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 76 * 77 * @since 1.0 78 */ 79 public abstract class AbstractFilter implements Filter { 80 81 // for normal logging activities, use the log category. 82 private static Logger log = null; 83 84 // for the few cycle statistics messages that are logged, 85 // use the stats category. 86 private static Logger stats = null; 87 88 private Supplier supplier; 89 private Consumer consumer; 90 private Consumer errors; 91 92 // for reporting fatal errors in a thread. 93 private ExceptionHandler handler; 94 95 private int maxRecords = 20; // default 96 private int numThreads = 1; // default 97 98 private String name = null; 99 100 /* 101 the shutdown flag needs to be volatile to ensure that 102 each thread accesses the correct value. The flag is 103 used to trigger a thread exit. 104 */ 105 private volatile boolean shutdown = false; 106 107 /*** 108 * constructor 109 */ 110 protected AbstractFilter() { 111 super(); 112 } 113 114 /*** 115 * constructor 116 * 117 * @param max - greatest number of records that this 118 * filter will accept from the channel in 1 batch. Used 119 * in conjunction w/ the thread count to performance tune 120 * the pipeline. 121 * 122 * @since 1.0 123 */ 124 protected AbstractFilter(int max) { 125 this.maxRecords = max; 126 } 127 128 /*** 129 * Initialize common Filter settings. Note: sub-classes may 130 * override this method, but they must call super.init() in 131 * addition to doing their own local initialization. If 132 * any of these values are already set, they will be over- 133 * written. If you don't want that to happen, do not provide 134 * a property setting for these attributes. 135 * </p> 136 * Sets the following common Filter attributes. 137 * 1) <ConcreteFilterClass>.max_records: the max number of 138 * records to be pulled from the channel during each work 139 * cycle. 140 * 2) <ConcreteFilterClass>.num_threads: the number of 141 * threads to run for this filter. 142 * If no setting is found, the default value will be used. 143 * - For threads, the default is 1. 144 * - For work sets, the default size is 20. 145 * 146 * @param properties 147 * @throws InitializationException 148 * 149 * @since 1.0 150 */ 151 public void init(Properties properties) 152 throws InitializationException { 153 try { 154 log = LogUtil.getLogger(); 155 stats = LogUtil.getLogger(PARCApplication.CYCLE_STATS); 156 } 157 catch (ConfigurationException e) { 158 throw new InitializationException(e.toString()); 159 } 160 161 try { 162 String full_name = this.getClass().getName(); 163 this.name = full_name.substring(full_name.lastIndexOf('.') + 1); 164 log.debug("Filter name = " + name); 165 166 String sizeStr = 167 properties.getProperty(this.name + ".max_records", "20"); 168 this.maxRecords = Integer.parseInt(sizeStr); 169 log.debug("max_records = " + maxRecords); 170 171 String threadsStr = 172 properties.getProperty(this.name + ".num_threads", "1"); 173 this.numThreads = Integer.parseInt(threadsStr); 174 log.debug("num_threads = " + numThreads); 175 } 176 catch (NumberFormatException nfe) { 177 log.error( 178 "configuration error. Filter settings " 179 + "provided, but not valid.", 180 nfe); 181 throw new InitializationException("Filter settings invalid", nfe); 182 } 183 } 184 185 /*** 186 * Thread execution method. Inherited from Runnable 187 * All this method does is call process(). & catch 188 * the ProcessingException. 189 * 190 * @since 1.0 191 */ 192 public void run() { 193 try { 194 process(); 195 } 196 catch (ProcessingException pe) { 197 log.fatal( 198 "Processing exception caught in Filter " 199 + "<" 200 + getName() 201 + ">", 202 pe); 203 204 // run() is only called when a Thread is running the 205 // Filter. In that case, the error cannot be propagated 206 // up the call stack, so we report it using the 207 // ExceptionHandler class. The Pipeline will watch the 208 // ExceptionHandler class & shutdown if a fatal error 209 // has been detected. If the pipeline is running w/in 210 // a single thread, the run() method won't be called, 211 // and the ProcessingException will roll up the call 212 // stack as per normal execution. 213 getExceptionHandler().reportException(pe); 214 215 pe.printStackTrace(); 216 } 217 catch (Throwable t) { 218 log.fatal( 219 "Non-PARC fatal exception caught in Filter " 220 + "<" 221 + getName() 222 + ">", 223 t); 224 225 getExceptionHandler().reportException(new ProcessingException(t)); 226 227 t.printStackTrace(); 228 } 229 } 230 231 /*** 232 * Template method to facilitate simple Filter implementations. 233 * <p/> 234 * This method, just like process(), is part of the main thread 235 * loop and therefore must be thread safe. 236 * 237 * @param in 238 * @return 239 * @throws ProcessingException 240 */ 241 protected abstract Collection doWork(Collection in) 242 throws ProcessingException; 243 244 /*** 245 * Tell the filter to shutdown after detecting idle cycles. 246 * markForDeath sets an 'exit' flag on the Filter. The 247 * pipeline model assumes that a complete cycle leaves the 248 * pipeline empty, therefore, the markForDeath() is _lazy_ 249 * in the sense that it only takes effect after a thread 250 * finds no more work in the inbound channel. 251 * 252 * @since 1.0 253 */ 254 public void markForDeath() { 255 log.debug("Filter <" + getName() + "> marked for death...."); 256 this.shutdown = true; 257 258 // notify any listeners that are waiting. 259 synchronized (this) { 260 notifyAll(); 261 } 262 } 263 /*** 264 * Reset the filter to ensure that it's ready to 265 * process records again. This method must be called 266 * after calling markForDeath() to reset the state 267 * of the Filter. 268 * <p/> 269 * Note: Although it's thread safe due to the volatile 270 * keyword, the reset() method is intended to be used 271 * @ the beginning/end of a pipeline execution cycle 272 * when no threads are actively executing the Filter. 273 * 274 * @since 1.1 275 */ 276 public void reset() { 277 log.debug("reset called on Filter"); 278 this.shutdown = false; 279 } 280 /*** 281 * Returns the shutdown flag 282 * @return boolean 283 */ 284 protected boolean isShutdown() { 285 return shutdown; 286 } 287 288 /*** 289 * Set the inbound delivery mechanism. 290 * 291 * @param s 292 * 293 * @since 1.0 294 */ 295 public void setInbound(Supplier s) { 296 this.supplier = s; 297 } 298 /*** 299 * supplier configured on the inbound side of the filter. 300 * @return Supplier 301 */ 302 public Supplier getInbound() { 303 return this.supplier; 304 } 305 306 /*** 307 * Set the outbound delivery mechanism. 308 * 309 * @param c 310 * 311 * @since 1.0 312 */ 313 public void setOutbound(Consumer c) { 314 this.consumer = c; 315 } 316 /*** 317 * consumer configured on the outbound side of this filter. 318 * @return Consumer 319 */ 320 public Consumer getOutbound() { 321 return this.consumer; 322 } 323 324 /*** 325 * Set the error delivery mechanism. 326 * 327 * @param err 328 * 329 * @since 1.0 330 */ 331 public void setErrorChannel(Consumer err) { 332 this.errors = err; 333 } 334 /*** 335 * return error channel 336 * @return Consumer 337 */ 338 public Consumer getErrorChannel() { 339 return this.errors; 340 } 341 342 /*** 343 * Set the exception handler mechanism. 344 * 345 * @param handler 346 * 347 * @since 1.0 348 */ 349 public void setExceptionHandler(ExceptionHandler handler) { 350 this.handler = handler; 351 } 352 /*** 353 * return exception handler 354 * @return ExceptionHandler 355 */ 356 public ExceptionHandler getExceptionHandler() { 357 return this.handler; 358 } 359 360 /*** 361 * Easy implementation for sub-classes 362 * @return String 363 * @since 1.0 364 */ 365 public String getName() { 366 return this.name; 367 } 368 369 /*** 370 * Method setNumThreads. 371 * @param val 372 */ 373 public void setNumThreads(int val) { 374 numThreads = val; 375 } 376 /*** 377 * Return the suggested number of threads to launch for this 378 * Filter. Obviously this requires that the application uses 379 * a Pipeline that implements multi-threading. The number of 380 * threads is '1' by default, but can be altered by setting a 381 * configuration property = <ConcreteFilterName>.num_threads 382 * to be whatever value you wish. 383 * 384 * @return int the number of threads to launch for this Filter 385 * 386 * @since 1.1 387 */ 388 public int numThreads() { 389 return this.numThreads; 390 } 391 392 /*** 393 * Method setMaxRecords. 394 * @param val 395 */ 396 public void setMaxRecords(int val) { 397 maxRecords = val; 398 } 399 /*** 400 * Returns the maxRecords. 401 * @return int 402 */ 403 public int getMaxRecords() { 404 return maxRecords; 405 } 406 407 /*** 408 * Returns the Logger instance used for statistics. 409 * 410 * @return Logger 411 */ 412 protected static Logger stats() { 413 return stats; 414 } 415 416 /*** 417 * Returns the log. 418 * @return Logger 419 */ 420 public static Logger log() { 421 return log; 422 } 423 424 }

This page was automatically generated by Maven