View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc.filter;
48
49 import com.rhi.architecture.logging.LogUtil;
50 import com.rhi.architecture.logging.Logger;
51 import com.rhi.architecture.parc.Consumer;
52 import com.rhi.architecture.parc.ExceptionHandler;
53 import com.rhi.architecture.parc.Filter;
54 import com.rhi.architecture.parc.PARCApplication;
55 import com.rhi.architecture.parc.ProcessingException;
56 import com.rhi.architecture.parc.Supplier;
57 import com.rhi.architecture.resource.InitializationException;
58 import com.rhi.architecture.config.ConfigurationException;
59
60 import java.util.Collection;
61 import java.util.Properties;
62
63 /***
64 * The AbstractFilter provides a partially implemented
65 * Filter allowing simpler concrete Filter objects. It
66 * deals w/ thread safe exit logic, and idle cycle
67 * detection as well as providing the required hooks for
68 * pre & post channel hookup. All concrete filter
69 * processing is deferred to a doWork() abstract method.
70 * <p/>
71 * Note: a simple run() method is provided to support
72 * the Runnable Interface.
73 *
74 * @author Pete McKinstry
75 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
76 *
77 * @since 1.0
78 */
79 public abstract class AbstractFilter implements Filter {
80
81 // for normal logging activities, use the log category.
82 private static Logger log = null;
83
84 // for the few cycle statistics messages that are logged,
85 // use the stats category.
86 private static Logger stats = null;
87
88 private Supplier supplier;
89 private Consumer consumer;
90 private Consumer errors;
91
92 // for reporting fatal errors in a thread.
93 private ExceptionHandler handler;
94
95 private int maxRecords = 20; // default
96 private int numThreads = 1; // default
97
98 private String name = null;
99
100 /*
101 the shutdown flag needs to be volatile to ensure that
102 each thread accesses the correct value. The flag is
103 used to trigger a thread exit.
104 */
105 private volatile boolean shutdown = false;
106
107 /***
108 * constructor
109 */
110 protected AbstractFilter() {
111 super();
112 }
113
114 /***
115 * constructor
116 *
117 * @param max - greatest number of records that this
118 * filter will accept from the channel in 1 batch. Used
119 * in conjunction w/ the thread count to performance tune
120 * the pipeline.
121 *
122 * @since 1.0
123 */
124 protected AbstractFilter(int max) {
125 this.maxRecords = max;
126 }
127
128 /***
129 * Initialize common Filter settings. Note: sub-classes may
130 * override this method, but they must call super.init() in
131 * addition to doing their own local initialization. If
132 * any of these values are already set, they will be over-
133 * written. If you don't want that to happen, do not provide
134 * a property setting for these attributes.
135 * </p>
136 * Sets the following common Filter attributes.
137 * 1) <ConcreteFilterClass>.max_records: the max number of
138 * records to be pulled from the channel during each work
139 * cycle.
140 * 2) <ConcreteFilterClass>.num_threads: the number of
141 * threads to run for this filter.
142 * If no setting is found, the default value will be used.
143 * - For threads, the default is 1.
144 * - For work sets, the default size is 20.
145 *
146 * @param properties
147 * @throws InitializationException
148 *
149 * @since 1.0
150 */
151 public void init(Properties properties)
152 throws InitializationException {
153 try {
154 log = LogUtil.getLogger();
155 stats = LogUtil.getLogger(PARCApplication.CYCLE_STATS);
156 }
157 catch (ConfigurationException e) {
158 throw new InitializationException(e.toString());
159 }
160
161 try {
162 String full_name = this.getClass().getName();
163 this.name = full_name.substring(full_name.lastIndexOf('.') + 1);
164 log.debug("Filter name = " + name);
165
166 String sizeStr =
167 properties.getProperty(this.name + ".max_records", "20");
168 this.maxRecords = Integer.parseInt(sizeStr);
169 log.debug("max_records = " + maxRecords);
170
171 String threadsStr =
172 properties.getProperty(this.name + ".num_threads", "1");
173 this.numThreads = Integer.parseInt(threadsStr);
174 log.debug("num_threads = " + numThreads);
175 }
176 catch (NumberFormatException nfe) {
177 log.error(
178 "configuration error. Filter settings "
179 + "provided, but not valid.",
180 nfe);
181 throw new InitializationException("Filter settings invalid", nfe);
182 }
183 }
184
185 /***
186 * Thread execution method. Inherited from Runnable
187 * All this method does is call process(). & catch
188 * the ProcessingException.
189 *
190 * @since 1.0
191 */
192 public void run() {
193 try {
194 process();
195 }
196 catch (ProcessingException pe) {
197 log.fatal(
198 "Processing exception caught in Filter "
199 + "<"
200 + getName()
201 + ">",
202 pe);
203
204 // run() is only called when a Thread is running the
205 // Filter. In that case, the error cannot be propagated
206 // up the call stack, so we report it using the
207 // ExceptionHandler class. The Pipeline will watch the
208 // ExceptionHandler class & shutdown if a fatal error
209 // has been detected. If the pipeline is running w/in
210 // a single thread, the run() method won't be called,
211 // and the ProcessingException will roll up the call
212 // stack as per normal execution.
213 getExceptionHandler().reportException(pe);
214
215 pe.printStackTrace();
216 }
217 catch (Throwable t) {
218 log.fatal(
219 "Non-PARC fatal exception caught in Filter "
220 + "<"
221 + getName()
222 + ">",
223 t);
224
225 getExceptionHandler().reportException(new ProcessingException(t));
226
227 t.printStackTrace();
228 }
229 }
230
231 /***
232 * Template method to facilitate simple Filter implementations.
233 * <p/>
234 * This method, just like process(), is part of the main thread
235 * loop and therefore must be thread safe.
236 *
237 * @param in
238 * @return
239 * @throws ProcessingException
240 */
241 protected abstract Collection doWork(Collection in)
242 throws ProcessingException;
243
244 /***
245 * Tell the filter to shutdown after detecting idle cycles.
246 * markForDeath sets an 'exit' flag on the Filter. The
247 * pipeline model assumes that a complete cycle leaves the
248 * pipeline empty, therefore, the markForDeath() is _lazy_
249 * in the sense that it only takes effect after a thread
250 * finds no more work in the inbound channel.
251 *
252 * @since 1.0
253 */
254 public void markForDeath() {
255 log.debug("Filter <" + getName() + "> marked for death....");
256 this.shutdown = true;
257
258 // notify any listeners that are waiting.
259 synchronized (this) {
260 notifyAll();
261 }
262 }
263 /***
264 * Reset the filter to ensure that it's ready to
265 * process records again. This method must be called
266 * after calling markForDeath() to reset the state
267 * of the Filter.
268 * <p/>
269 * Note: Although it's thread safe due to the volatile
270 * keyword, the reset() method is intended to be used
271 * @ the beginning/end of a pipeline execution cycle
272 * when no threads are actively executing the Filter.
273 *
274 * @since 1.1
275 */
276 public void reset() {
277 log.debug("reset called on Filter");
278 this.shutdown = false;
279 }
280 /***
281 * Returns the shutdown flag
282 * @return boolean
283 */
284 protected boolean isShutdown() {
285 return shutdown;
286 }
287
288 /***
289 * Set the inbound delivery mechanism.
290 *
291 * @param s
292 *
293 * @since 1.0
294 */
295 public void setInbound(Supplier s) {
296 this.supplier = s;
297 }
298 /***
299 * supplier configured on the inbound side of the filter.
300 * @return Supplier
301 */
302 public Supplier getInbound() {
303 return this.supplier;
304 }
305
306 /***
307 * Set the outbound delivery mechanism.
308 *
309 * @param c
310 *
311 * @since 1.0
312 */
313 public void setOutbound(Consumer c) {
314 this.consumer = c;
315 }
316 /***
317 * consumer configured on the outbound side of this filter.
318 * @return Consumer
319 */
320 public Consumer getOutbound() {
321 return this.consumer;
322 }
323
324 /***
325 * Set the error delivery mechanism.
326 *
327 * @param err
328 *
329 * @since 1.0
330 */
331 public void setErrorChannel(Consumer err) {
332 this.errors = err;
333 }
334 /***
335 * return error channel
336 * @return Consumer
337 */
338 public Consumer getErrorChannel() {
339 return this.errors;
340 }
341
342 /***
343 * Set the exception handler mechanism.
344 *
345 * @param handler
346 *
347 * @since 1.0
348 */
349 public void setExceptionHandler(ExceptionHandler handler) {
350 this.handler = handler;
351 }
352 /***
353 * return exception handler
354 * @return ExceptionHandler
355 */
356 public ExceptionHandler getExceptionHandler() {
357 return this.handler;
358 }
359
360 /***
361 * Easy implementation for sub-classes
362 * @return String
363 * @since 1.0
364 */
365 public String getName() {
366 return this.name;
367 }
368
369 /***
370 * Method setNumThreads.
371 * @param val
372 */
373 public void setNumThreads(int val) {
374 numThreads = val;
375 }
376 /***
377 * Return the suggested number of threads to launch for this
378 * Filter. Obviously this requires that the application uses
379 * a Pipeline that implements multi-threading. The number of
380 * threads is '1' by default, but can be altered by setting a
381 * configuration property = <ConcreteFilterName>.num_threads
382 * to be whatever value you wish.
383 *
384 * @return int the number of threads to launch for this Filter
385 *
386 * @since 1.1
387 */
388 public int numThreads() {
389 return this.numThreads;
390 }
391
392 /***
393 * Method setMaxRecords.
394 * @param val
395 */
396 public void setMaxRecords(int val) {
397 maxRecords = val;
398 }
399 /***
400 * Returns the maxRecords.
401 * @return int
402 */
403 public int getMaxRecords() {
404 return maxRecords;
405 }
406
407 /***
408 * Returns the Logger instance used for statistics.
409 *
410 * @return Logger
411 */
412 protected static Logger stats() {
413 return stats;
414 }
415
416 /***
417 * Returns the log.
418 * @return Logger
419 */
420 public static Logger log() {
421 return log;
422 }
423
424 }
This page was automatically generated by Maven