View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc; 48 49 import com.rhi.architecture.batch.AuditAgent; 50 import com.rhi.architecture.batch.ConfigMixinUtils; 51 import com.rhi.architecture.logging.LogUtil; 52 import com.rhi.architecture.logging.Logger; 53 import com.rhi.architecture.config.ConfigurationException; 54 import com.rhi.architecture.resource.InitializationException; 55 import com.rhi.architecture.resource.ResourceContext; 56 57 import java.util.Properties; 58 59 /*** 60 * The PipelineProcess is a process instance that runs a pipeline 61 * (InputAdapter <-> Pipeline <-> OutputAdapter). It may run 62 * within a set of processes or by itself. 63 * 64 * 65 * @author Pete McKinstry 66 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 67 * 68 * @since 1.0 69 */ 70 public class PipelineProcess implements Process { 71 72 // Logger implementation. Log4J or otherwise. 73 private static Logger log = null; 74 static { 75 try { 76 log = LogUtil.getLogger(); 77 } 78 catch (ConfigurationException e) { 79 throw new ExceptionInInitializerError( 80 "failure loading logger.e = " + e); 81 } 82 } 83 84 // unique name for this Pipeline process 85 private String name; 86 private Properties props; 87 88 // Concrete classes for the following member attributes are 89 // loaded from a property file and instantiated via reflection. 90 private ExecutionStrategy strategy; 91 private OutputAdapter outputAdapter; 92 private InputAdapter inputAdapter; 93 private AuditAgent auditAgent; 94 95 /*** 96 * Constructor 97 */ 98 public PipelineProcess() { 99 super(); 100 } 101 102 /*** 103 * Init method will be called prior to run to allow the 104 * strategy to initialize itself & acquire any necessary 105 * resources. 106 * @param props 107 * @throws InitializationException 108 * @since 1.0 109 */ 110 public void init(Properties props) throws InitializationException { 111 this.props = props; 112 113 /* 114 * Override the parent program name w/ the current processes 115 * name so that you load the correct configuration. 116 * Process has become a pseudo-program/application. 117 */ 118 props.setProperty(PARCConfiguration.PROG_NAME, getName()); 119 120 // Load properties (w/ the new prog_name) 121 ConfigMixinUtils.loadAllMixins(props); 122 123 /* 124 * Re-Initialize the ResourceContext in case new resources 125 * are required. 126 */ 127 ResourceContext ctx = new ResourceContext(); 128 ctx.init(props); 129 130 /* 131 * Create & initialize the audit agent 132 * specific to this process. 133 */ 134 String agentType = props.getProperty("AuditAgent", 135 "com.rhi.architecture.batch.NullAuditAgent"); 136 auditAgent = AuditAgent.getInstance(agentType); 137 auditAgent.init(props); 138 139 /* 140 * Create & configure the pipeline components 141 */ 142 configurePipeline(props); 143 144 } 145 146 /*** 147 * Loads the appropriate pipeline components based on the config 148 * settings. It calls getPipeline() to deal w/ the Filter details, 149 * but maintains control over the input & output adapters as 150 * well as the execution strategy. 151 * 152 * @param props 153 * @throws InitializationException 154 * 155 * @since 1.0 156 */ 157 private void configurePipeline(Properties props) 158 throws InitializationException { 159 try { 160 log.info("process name = " + getName()); 161 162 // 1. set strategy. 163 // a default strategy is provided, but you should always 164 // set your own just to be double sure. 165 String strategyClassName = 166 props.getProperty( 167 "ExecutionStrategy", 168 "com.rhi.architecture.parc.SingleCallStrategy"); 169 Class strategyClass = Class.forName(strategyClassName); 170 strategy = (ExecutionStrategy) strategyClass.newInstance(); 171 strategy.init(props); 172 173 // 2. create the correct input adapter 174 String iAdapterClassName = props.getProperty("InputAdapter"); 175 log.info("input adapter = " + iAdapterClassName); 176 177 Class iAdapterClass = Class.forName(iAdapterClassName); 178 inputAdapter = (InputAdapter) iAdapterClass.newInstance(); 179 inputAdapter.init(props); 180 181 // 3. create the correct output adapter 182 String oAdapterClassName = props.getProperty("OutputAdapter"); 183 Class oAdapterClass = Class.forName(oAdapterClassName); 184 outputAdapter = (OutputAdapter) oAdapterClass.newInstance(); 185 outputAdapter.init(props); 186 187 inputAdapter.setAuditAgent(getAuditAgent()); 188 outputAdapter.setAuditAgent(getAuditAgent()); 189 190 // 5. Initialize the strategy object w/ all the proper 191 // components. 192 strategy.setInputAdapter(getInputAdapter()); 193 strategy.setOutputAdapter(getOutputAdapter()); 194 strategy.setPipeline(getPipeline()); 195 } 196 catch (ClassNotFoundException cnfe) { 197 throw new InitializationException("ClassNotFoundException", cnfe); 198 } 199 catch (InstantiationException ie) { 200 throw new InitializationException("InstantiationException", ie); 201 } 202 catch (IllegalAccessException iae) { 203 throw new InitializationException("IllegalAccessException", iae); 204 } 205 } 206 207 /*** 208 * This method should construct a Pipeline, a set of 209 * Filters, attach those Filters to the Pipeline, and 210 * return the fully configured Pipeline to the framework 211 * for execution. 212 * <p /> 213 * A default implementation has been provided here for the 214 * simplest case. It is driven from property file settings 215 * and assumes default constructors exist for both the 216 * Pipeline and all the Filters. It is anticipated that this 217 * will be to restrictive for many non-trivial applications. 218 * In those cases, this method should be overridden and 219 * implemented appropriately. 220 * 221 * @return Pipeline 222 * @throws InitializationException 223 * @since 1.0 224 */ 225 public Pipeline getPipeline() throws InitializationException { 226 227 try { 228 Properties props = getProperties(); 229 // get the channel class, use ArrayList as the default. 230 String defaultChannel = 231 props.getProperty( 232 "Channel", 233 "com.rhi.architecture.parc.ArrayListQueue"); 234 Class channelClass = Class.forName(defaultChannel); 235 Channel channel = (Channel) channelClass.newInstance(); 236 237 // get the pipeline class - no default. 238 String pipelineClassName = props.getProperty("Pipeline", 239 "com.rhi.architecture.parc.SerialMTPipeline"); 240 Class pipelineClass = Class.forName(pipelineClassName); 241 Pipeline pipeline = (Pipeline) pipelineClass.newInstance(); 242 pipeline.init(channel); 243 244 // Filters will be added to the pipeline in numeric order, 245 // until a number is missing. The numbering starts w/ 0. 246 int count = 0; 247 while (true) { 248 String nextName = "Filter" + count; 249 String filterClassName = 250 props.getProperty(nextName, "not found"); 251 if (filterClassName.equals("not found")) { 252 break; 253 } 254 log.info("loading " + nextName + " = " + filterClassName); 255 Class filterClass = Class.forName(filterClassName); 256 Filter filter = (Filter) filterClass.newInstance(); 257 filter.init(props); 258 pipeline.addFilter(filter); 259 count++; 260 } 261 262 return pipeline; 263 } 264 catch (ClassNotFoundException cnfe) { 265 throw new InitializationException("ClassNotFoundException", cnfe); 266 } 267 catch (InstantiationException ie) { 268 throw new InitializationException("InstantiationException", ie); 269 } 270 catch (IllegalAccessException iae) { 271 throw new InitializationException("IllegalAccessException", iae); 272 } 273 } 274 275 /*** 276 * Run the Process. For PipelineProcess, this will 277 * call strategy.run() & then outputAdapter.close() 278 * 279 * @exception ProcessingException 280 * 281 * @since 1.0 282 */ 283 public void run() throws ProcessingException { 284 log.debug("run()"); 285 try { 286 strategy.run(); 287 288 // after all the batches are processed, do any work 289 // that must happen once & only once per interface run. 290 outputAdapter.close(); 291 } 292 catch (ProcessingException pe) { 293 log.error("ProcessingException thrown.", pe); 294 throw new ProcessingException(pe.toString(), pe); 295 } 296 } 297 298 /*** 299 * Perform any cleanup required. This allows the Process to 300 * keep resources open after the run() method in case a 301 * multi-call strategy is used to execute run() repeatedly. 302 * 303 * @since 1.0 304 */ 305 public void cleanup() { 306 // run cleanup on input adapter & set reference to null. 307 if (inputAdapter != null) { 308 inputAdapter.cleanup(); 309 inputAdapter = null; 310 } 311 // run cleanup on output adapter & set reference to null. 312 if (outputAdapter != null) { 313 outputAdapter.cleanup(); 314 outputAdapter = null; 315 } 316 // audit agent 317 if (auditAgent != null) { 318 auditAgent = null; 319 } 320 // exec strategy 321 if (strategy != null) { 322 strategy.cleanup(); 323 strategy = null; 324 } 325 //clear the properties hashtable. 326 props.clear(); 327 328 /* 329 * Implementation Note: The ResourceContext should not be 330 * cleared here so that it can be shared by Process objects. 331 * It is finally cleaned up by the BatchApplication when 332 * all processes are complete. 333 */ 334 } 335 336 /*** 337 * getName returns the configured name for this process. This 338 * method must return a unique identifier for each process 339 * running w/in the same BatchApplication. It is used to 340 * identify properties that are specific to a given Process. 341 * 342 * @return String 343 * 344 * @since 1.0 345 */ 346 public String getName() { 347 if (name == null) { 348 String tmp = ProcessSet.class.getName(); 349 int idx = tmp.lastIndexOf("."); 350 return tmp.substring(idx + 1); 351 } 352 return name; 353 } 354 /*** 355 * setName is called by either the ProcessSet if this Process is 356 * contained w/in a ProcessSet to provide it w/ a unique name for 357 * property lookups. (name precedes properties specific to the 358 * process) 359 * 360 * @param name 361 * 362 * @since 1.0 363 */ 364 public void setName(String name) { 365 this.name = name; 366 } 367 368 /*** 369 * The interface application defers execution to a strategy 370 * interface. 371 * 372 * @return ExecutionStrategy 373 * 374 * @since 1.0 375 */ 376 public ExecutionStrategy getExecutionStrategy() { 377 return strategy; 378 } 379 /*** 380 * Set the strategy 381 * 382 * @param strategy 383 * 384 * @since 1.0 385 */ 386 public void setExecutionStrategy(ExecutionStrategy strategy) { 387 this.strategy = strategy; 388 } 389 390 /*** 391 * Return the Output Adapter that persists the work set 392 * after processing is completed. 393 * 394 * @return OutputAdapter 395 * 396 * @since 1.0 397 */ 398 public OutputAdapter getOutputAdapter() { 399 return outputAdapter; 400 } 401 /*** 402 * Set the output adapter 403 * 404 * @param outputAdapter 405 * 406 * @since 1.0 407 */ 408 public void setOutputAdapter(OutputAdapter outputAdapter) { 409 this.outputAdapter = outputAdapter; 410 } 411 412 /*** 413 * Return the Input Adapter that selects the work set to 414 * process. Typically this adapter will query the database 415 * or file system for a group of records to transform. 416 * 417 * @return InputAdapter 418 * 419 * @since 1.0 420 */ 421 public InputAdapter getInputAdapter() { 422 return inputAdapter; 423 } 424 /*** 425 * Set the input adapter 426 * 427 * @param inputAdapter 428 * 429 * @since 1.0 430 */ 431 public void setInputAdapter(InputAdapter inputAdapter) { 432 this.inputAdapter = inputAdapter; 433 } 434 435 /*** 436 * Returns the auditAgent. 437 * @return AuditAgent 438 */ 439 public AuditAgent getAuditAgent() { 440 return auditAgent; 441 } 442 /*** 443 * Sets the auditAgent. 444 * @param auditAgent The auditAgent to set 445 */ 446 public void setAuditAgent(AuditAgent auditAgent) { 447 this.auditAgent = auditAgent; 448 } 449 450 /*** 451 * get the configuration properties 452 * 453 * @return Properties 454 * 455 * @since 1.0 456 */ 457 public Properties getProperties() { 458 return props; 459 } 460 461 /*** 462 * Returns the logger instance 463 * @return Logger 464 */ 465 public static Logger log() { 466 return log; 467 } 468 469 }

This page was automatically generated by Maven