View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc;
48
49 import com.rhi.architecture.config.ConfigurationException;
50 import com.rhi.architecture.logging.LogUtil;
51 import com.rhi.architecture.logging.Logger;
52
53 import java.util.Properties;
54
55 /***
56 * Abstract Execution Strategy
57 *
58 * @author Pete McKinstry
59 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
60 *
61 * @since 1.0
62 */
63 public abstract class AbstractExecutionStrategy
64 implements ExecutionStrategy {
65
66 private static Logger log = LogUtil.getLogger();
67
68 // for the few cycle statistics messages that are logged,
69 // use the stats category.
70 private static Logger stats = null;
71
72 /***
73 * Between execution cycles, sleep for this amount of time.
74 */
75 public static final String SLEEP_TIME = "ExecutionStrategy.sleepTime";
76 /***
77 * the default sleep time
78 */
79 public static final String DEFAULT_SLEEP_TIME = "0";
80
81 private long sleepTime;
82
83 private InputAdapter inputAdapter;
84 private OutputAdapter outputAdapter;
85 private Pipeline pipe;
86
87 private Properties properties;
88
89 private int SHUTDOWN_WAIT_TIME = 100; // msecs
90
91 /***
92 * @see com.rhi.architecture.resource.Resource#init(java.util.Properties)
93 * @param p
94 */
95 public void init(Properties p) {
96 this.properties = p;
97 String amount = p.getProperty(SLEEP_TIME, DEFAULT_SLEEP_TIME);
98 try {
99 this.sleepTime = Long.parseLong(amount);
100 log.debug("sleep time set to " + this.sleepTime);
101 }
102 catch (NumberFormatException e) {
103 log.error(SLEEP_TIME + " configuration value is not a number. " +
104 "a default value of zero will be used.", e);
105 this.sleepTime = 0;
106 }
107 }
108
109 /***
110 * @see com.rhi.architecture.parc.ExecutionStrategy#cleanup()
111 */
112 public void cleanup() {
113 // no op. can be implemented by sub-classes if they so desire.
114 }
115
116 /***
117 * @return Returns the sleepTime.
118 */
119 public long getSleepTime() {
120 return this.sleepTime;
121 }
122
123 /***
124 * Set method to tell the strategy what input adapter to use.
125 *
126 * @param ia
127 *
128 * @since 1.0
129 */
130 public void setInputAdapter(InputAdapter ia) {
131 this.inputAdapter = ia;
132 }
133 /***
134 * Get input adapter.
135 *
136 * @return InputAdapter
137 *
138 * @since 1.0
139 */
140 public InputAdapter getInputAdapter() {
141 return this.inputAdapter;
142 }
143
144 /***
145 * Set method to tell the strategy what output adapter to use.
146 *
147 * @param oa
148 *
149 * @since 1.0
150 */
151 public void setOutputAdapter(OutputAdapter oa) {
152 this.outputAdapter = oa;
153 }
154 /***
155 * Get output adapter.
156 *
157 * @return OutputAdapter
158 *
159 * @since 1.0
160 */
161 public OutputAdapter getOutputAdapter() {
162 return this.outputAdapter;
163 }
164
165 /***
166 * Set method to tell the strategy what pipeline to use.
167 *
168 * @param p
169 *
170 * @since 1.0
171 */
172 public void setPipeline(Pipeline p) {
173 this.pipe = p;
174 }
175 /***
176 * Get Pipeline
177 *
178 * @return Pipeline
179 *
180 * @since 1.0
181 */
182 public Pipeline getPipeline() {
183 return this.pipe;
184 }
185
186 /***
187 * ExecutionStrategies can use this method to run the
188 * pipeline using shared standard logic. They then wrap
189 * the doRun() method w/ whatever strategy they wish to
190 * apply while continuing to share the details.
191 *
192 * @exception ProcessingException
193 * @return int - the number of records processed.
194 *
195 * @since 1.0
196 */
197 protected int doRun() throws ProcessingException {
198
199 Properties p = getProperties();
200
201 int batchSize = 0;
202
203 long startTime = System.currentTimeMillis();
204 logStats("Pipeline started");
205
206 // Pipeline & OutputAdapter share an exception handler so that
207 // both exit immediately if a fatal error occurs.
208 ExceptionHandler handler = new ExceptionHandler();
209 getOutputAdapter().setExceptionHandler(handler);
210 getPipeline().setExceptionHandler(handler);
211
212 // set the input, output, and error channels on the adapters.
213 hookupChannels();
214
215 // retrieve input records, create audit record
216 batchSize = getInputAdapter().push(getPipeline().getInputChannel());
217 if (batchSize > 0) {
218 Thread oaThread = null;
219 String mode =
220 p.getProperty(
221 OutputAdapter.RUN_AS_THREAD_KEY,
222 OutputAdapter.DEFAULT_THREAD_MODE);
223 if (mode.equalsIgnoreCase(OutputAdapter.RUN_AS_THREAD)) {
224 oaThread = new Thread(getOutputAdapter());
225 oaThread.start();
226 }
227 // process records
228 getPipeline().process();
229 // ensure the pipeline is empty. (all records complete)
230 getPipeline().flush();
231
232 // tell it to complete the records in the channel & exit.
233 getOutputAdapter().markComplete();
234
235 if (mode.equalsIgnoreCase(OutputAdapter.RUN_AS_THREAD)) {
236 // wait for the output adapter to exit.
237 while (oaThread.isAlive()) {
238 try {
239 Thread.sleep(SHUTDOWN_WAIT_TIME);
240 }
241 catch (InterruptedException e) {
242 // ignore
243 }
244 }
245 }
246 else {
247 // run OA w/in this thread.
248 getOutputAdapter().run();
249 }
250
251 // Detect an abnormal exit & handle it correctly. (By
252 // propogating the exception.
253 if (handler.hasError()) {
254 handler.rethrowException();
255 }
256 }
257
258 long endTime = System.currentTimeMillis();
259 if (batchSize > 0) {
260 int rounded = (int) (endTime - startTime + 500) / 1000; // round
261 if (rounded == 0) {
262 ++rounded; // if 0 seconds, use 1.
263 }
264 logStats(
265 "Pipeline complete. Stats = "
266 + (batchSize / rounded)
267 + " recs/sec");
268 }
269 else {
270 logStats("Pipeline complete. Empty Batch.");
271 }
272
273 return batchSize;
274 }
275
276 /***
277 * Method configureChannels.
278 */
279 protected void hookupChannels() {
280 // Connect adapters to the Pipeline channels.
281 // -- InputAdapter may need access to the error channel
282 // -- in case of record parsing errors.
283 getInputAdapter().setErrorChannel(getPipeline().getErrorChannel());
284 getOutputAdapter().setValidChannel(getPipeline().getOutputChannel());
285 getOutputAdapter().setErrorChannel(getPipeline().getErrorChannel());
286 }
287
288 /***
289 * Get the configuration settings from the ConfigFacility
290 * @return Properties object storing configuration data
291 * @throws ProcessingException
292 */
293 protected Properties getProperties() throws ProcessingException {
294 return this.properties;
295 }
296
297 /***
298 * Log performance statistics w/ a different logger than normal
299 * application logging messages.
300 * @param msg
301 */
302 public void logStats(String msg) {
303 if (stats == null) {
304 try {
305 stats = LogUtil.getLogger(PARCApplication.CYCLE_STATS);
306 }
307 catch (ConfigurationException e) {
308 // ignore stats failures. Non-critical.
309 }
310 }
311 if (stats != null) {
312 stats.info(msg);
313 }
314 }
315
316 }
This page was automatically generated by Maven