View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc;
48
49 import com.rhi.architecture.batch.AuditAgent;
50 import com.rhi.architecture.batch.ConfigMixinUtils;
51 import com.rhi.architecture.logging.LogUtil;
52 import com.rhi.architecture.logging.Logger;
53 import com.rhi.architecture.config.ConfigurationException;
54 import com.rhi.architecture.resource.InitializationException;
55 import com.rhi.architecture.resource.ResourceContext;
56
57 import java.util.Properties;
58
59 /***
60 * The PipelineProcess is a process instance that runs a pipeline
61 * (InputAdapter <-> Pipeline <-> OutputAdapter). It may run
62 * within a set of processes or by itself.
63 *
64 *
65 * @author Pete McKinstry
66 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
67 *
68 * @since 1.0
69 */
70 public class PipelineProcess implements Process {
71
72 // Logger implementation. Log4J or otherwise.
73 private static Logger log = null;
74 static {
75 try {
76 log = LogUtil.getLogger();
77 }
78 catch (ConfigurationException e) {
79 throw new ExceptionInInitializerError(
80 "failure loading logger.e = " + e);
81 }
82 }
83
84 // unique name for this Pipeline process
85 private String name;
86 private Properties props;
87
88 // Concrete classes for the following member attributes are
89 // loaded from a property file and instantiated via reflection.
90 private ExecutionStrategy strategy;
91 private OutputAdapter outputAdapter;
92 private InputAdapter inputAdapter;
93 private AuditAgent auditAgent;
94
95 /***
96 * Constructor
97 */
98 public PipelineProcess() {
99 super();
100 }
101
102 /***
103 * Init method will be called prior to run to allow the
104 * strategy to initialize itself & acquire any necessary
105 * resources.
106 * @param props
107 * @throws InitializationException
108 * @since 1.0
109 */
110 public void init(Properties props) throws InitializationException {
111 this.props = props;
112
113 /*
114 * Override the parent program name w/ the current processes
115 * name so that you load the correct configuration.
116 * Process has become a pseudo-program/application.
117 */
118 props.setProperty(PARCConfiguration.PROG_NAME, getName());
119
120 // Load properties (w/ the new prog_name)
121 ConfigMixinUtils.loadAllMixins(props);
122
123 /*
124 * Re-Initialize the ResourceContext in case new resources
125 * are required.
126 */
127 ResourceContext ctx = new ResourceContext();
128 ctx.init(props);
129
130 /*
131 * Create & initialize the audit agent
132 * specific to this process.
133 */
134 String agentType = props.getProperty("AuditAgent",
135 "com.rhi.architecture.batch.NullAuditAgent");
136 auditAgent = AuditAgent.getInstance(agentType);
137 auditAgent.init(props);
138
139 /*
140 * Create & configure the pipeline components
141 */
142 configurePipeline(props);
143
144 }
145
146 /***
147 * Loads the appropriate pipeline components based on the config
148 * settings. It calls getPipeline() to deal w/ the Filter details,
149 * but maintains control over the input & output adapters as
150 * well as the execution strategy.
151 *
152 * @param props
153 * @throws InitializationException
154 *
155 * @since 1.0
156 */
157 private void configurePipeline(Properties props)
158 throws InitializationException {
159 try {
160 log.info("process name = " + getName());
161
162 // 1. set strategy.
163 // a default strategy is provided, but you should always
164 // set your own just to be double sure.
165 String strategyClassName =
166 props.getProperty(
167 "ExecutionStrategy",
168 "com.rhi.architecture.parc.SingleCallStrategy");
169 Class strategyClass = Class.forName(strategyClassName);
170 strategy = (ExecutionStrategy) strategyClass.newInstance();
171 strategy.init(props);
172
173 // 2. create the correct input adapter
174 String iAdapterClassName = props.getProperty("InputAdapter");
175 log.info("input adapter = " + iAdapterClassName);
176
177 Class iAdapterClass = Class.forName(iAdapterClassName);
178 inputAdapter = (InputAdapter) iAdapterClass.newInstance();
179 inputAdapter.init(props);
180
181 // 3. create the correct output adapter
182 String oAdapterClassName = props.getProperty("OutputAdapter");
183 Class oAdapterClass = Class.forName(oAdapterClassName);
184 outputAdapter = (OutputAdapter) oAdapterClass.newInstance();
185 outputAdapter.init(props);
186
187 inputAdapter.setAuditAgent(getAuditAgent());
188 outputAdapter.setAuditAgent(getAuditAgent());
189
190 // 5. Initialize the strategy object w/ all the proper
191 // components.
192 strategy.setInputAdapter(getInputAdapter());
193 strategy.setOutputAdapter(getOutputAdapter());
194 strategy.setPipeline(getPipeline());
195 }
196 catch (ClassNotFoundException cnfe) {
197 throw new InitializationException("ClassNotFoundException", cnfe);
198 }
199 catch (InstantiationException ie) {
200 throw new InitializationException("InstantiationException", ie);
201 }
202 catch (IllegalAccessException iae) {
203 throw new InitializationException("IllegalAccessException", iae);
204 }
205 }
206
207 /***
208 * This method should construct a Pipeline, a set of
209 * Filters, attach those Filters to the Pipeline, and
210 * return the fully configured Pipeline to the framework
211 * for execution.
212 * <p />
213 * A default implementation has been provided here for the
214 * simplest case. It is driven from property file settings
215 * and assumes default constructors exist for both the
216 * Pipeline and all the Filters. It is anticipated that this
217 * will be to restrictive for many non-trivial applications.
218 * In those cases, this method should be overridden and
219 * implemented appropriately.
220 *
221 * @return Pipeline
222 * @throws InitializationException
223 * @since 1.0
224 */
225 public Pipeline getPipeline() throws InitializationException {
226
227 try {
228 Properties props = getProperties();
229 // get the channel class, use ArrayList as the default.
230 String defaultChannel =
231 props.getProperty(
232 "Channel",
233 "com.rhi.architecture.parc.ArrayListQueue");
234 Class channelClass = Class.forName(defaultChannel);
235 Channel channel = (Channel) channelClass.newInstance();
236
237 // get the pipeline class - no default.
238 String pipelineClassName = props.getProperty("Pipeline",
239 "com.rhi.architecture.parc.SerialMTPipeline");
240 Class pipelineClass = Class.forName(pipelineClassName);
241 Pipeline pipeline = (Pipeline) pipelineClass.newInstance();
242 pipeline.init(channel);
243
244 // Filters will be added to the pipeline in numeric order,
245 // until a number is missing. The numbering starts w/ 0.
246 int count = 0;
247 while (true) {
248 String nextName = "Filter" + count;
249 String filterClassName =
250 props.getProperty(nextName, "not found");
251 if (filterClassName.equals("not found")) {
252 break;
253 }
254 log.info("loading " + nextName + " = " + filterClassName);
255 Class filterClass = Class.forName(filterClassName);
256 Filter filter = (Filter) filterClass.newInstance();
257 filter.init(props);
258 pipeline.addFilter(filter);
259 count++;
260 }
261
262 return pipeline;
263 }
264 catch (ClassNotFoundException cnfe) {
265 throw new InitializationException("ClassNotFoundException", cnfe);
266 }
267 catch (InstantiationException ie) {
268 throw new InitializationException("InstantiationException", ie);
269 }
270 catch (IllegalAccessException iae) {
271 throw new InitializationException("IllegalAccessException", iae);
272 }
273 }
274
275 /***
276 * Run the Process. For PipelineProcess, this will
277 * call strategy.run() & then outputAdapter.close()
278 *
279 * @exception ProcessingException
280 *
281 * @since 1.0
282 */
283 public void run() throws ProcessingException {
284 log.debug("run()");
285 try {
286 strategy.run();
287
288 // after all the batches are processed, do any work
289 // that must happen once & only once per interface run.
290 outputAdapter.close();
291 }
292 catch (ProcessingException pe) {
293 log.error("ProcessingException thrown.", pe);
294 throw new ProcessingException(pe.toString(), pe);
295 }
296 }
297
298 /***
299 * Perform any cleanup required. This allows the Process to
300 * keep resources open after the run() method in case a
301 * multi-call strategy is used to execute run() repeatedly.
302 *
303 * @since 1.0
304 */
305 public void cleanup() {
306 // run cleanup on input adapter & set reference to null.
307 if (inputAdapter != null) {
308 inputAdapter.cleanup();
309 inputAdapter = null;
310 }
311 // run cleanup on output adapter & set reference to null.
312 if (outputAdapter != null) {
313 outputAdapter.cleanup();
314 outputAdapter = null;
315 }
316 // audit agent
317 if (auditAgent != null) {
318 auditAgent = null;
319 }
320 // exec strategy
321 if (strategy != null) {
322 strategy.cleanup();
323 strategy = null;
324 }
325 //clear the properties hashtable.
326 props.clear();
327
328 /*
329 * Implementation Note: The ResourceContext should not be
330 * cleared here so that it can be shared by Process objects.
331 * It is finally cleaned up by the BatchApplication when
332 * all processes are complete.
333 */
334 }
335
336 /***
337 * getName returns the configured name for this process. This
338 * method must return a unique identifier for each process
339 * running w/in the same BatchApplication. It is used to
340 * identify properties that are specific to a given Process.
341 *
342 * @return String
343 *
344 * @since 1.0
345 */
346 public String getName() {
347 if (name == null) {
348 String tmp = ProcessSet.class.getName();
349 int idx = tmp.lastIndexOf(".");
350 return tmp.substring(idx + 1);
351 }
352 return name;
353 }
354 /***
355 * setName is called by either the ProcessSet if this Process is
356 * contained w/in a ProcessSet to provide it w/ a unique name for
357 * property lookups. (name precedes properties specific to the
358 * process)
359 *
360 * @param name
361 *
362 * @since 1.0
363 */
364 public void setName(String name) {
365 this.name = name;
366 }
367
368 /***
369 * The interface application defers execution to a strategy
370 * interface.
371 *
372 * @return ExecutionStrategy
373 *
374 * @since 1.0
375 */
376 public ExecutionStrategy getExecutionStrategy() {
377 return strategy;
378 }
379 /***
380 * Set the strategy
381 *
382 * @param strategy
383 *
384 * @since 1.0
385 */
386 public void setExecutionStrategy(ExecutionStrategy strategy) {
387 this.strategy = strategy;
388 }
389
390 /***
391 * Return the Output Adapter that persists the work set
392 * after processing is completed.
393 *
394 * @return OutputAdapter
395 *
396 * @since 1.0
397 */
398 public OutputAdapter getOutputAdapter() {
399 return outputAdapter;
400 }
401 /***
402 * Set the output adapter
403 *
404 * @param outputAdapter
405 *
406 * @since 1.0
407 */
408 public void setOutputAdapter(OutputAdapter outputAdapter) {
409 this.outputAdapter = outputAdapter;
410 }
411
412 /***
413 * Return the Input Adapter that selects the work set to
414 * process. Typically this adapter will query the database
415 * or file system for a group of records to transform.
416 *
417 * @return InputAdapter
418 *
419 * @since 1.0
420 */
421 public InputAdapter getInputAdapter() {
422 return inputAdapter;
423 }
424 /***
425 * Set the input adapter
426 *
427 * @param inputAdapter
428 *
429 * @since 1.0
430 */
431 public void setInputAdapter(InputAdapter inputAdapter) {
432 this.inputAdapter = inputAdapter;
433 }
434
435 /***
436 * Returns the auditAgent.
437 * @return AuditAgent
438 */
439 public AuditAgent getAuditAgent() {
440 return auditAgent;
441 }
442 /***
443 * Sets the auditAgent.
444 * @param auditAgent The auditAgent to set
445 */
446 public void setAuditAgent(AuditAgent auditAgent) {
447 this.auditAgent = auditAgent;
448 }
449
450 /***
451 * get the configuration properties
452 *
453 * @return Properties
454 *
455 * @since 1.0
456 */
457 public Properties getProperties() {
458 return props;
459 }
460
461 /***
462 * Returns the logger instance
463 * @return Logger
464 */
465 public static Logger log() {
466 return log;
467 }
468
469 }
This page was automatically generated by Maven