View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc.filter;
48
49 import com.rhi.architecture.parc.ArrayListQueue;
50 import com.rhi.architecture.parc.Channel;
51 import com.rhi.architecture.parc.Consumer;
52 import com.rhi.architecture.parc.ExceptionHandler;
53 import com.rhi.architecture.parc.Filter;
54 import com.rhi.architecture.parc.ProcessingException;
55 import com.rhi.architecture.parc.Supplier;
56 import com.rhi.architecture.resource.InitializationException;
57
58 import java.util.Collections;
59 import java.util.Iterator;
60 import java.util.LinkedList;
61 import java.util.List;
62 import java.util.Properties;
63
64 /***
65 * FilterSet. Composite element of GOF pattern.
66 * Note: Since it's just a container, it has been marked as
67 * final. No sub-classing should be required.
68 *
69 * @author Pete McKinstry
70 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
71 *
72 * @since 1.0
73 */
74 public final class FilterSet implements Filter {
75
76 // the list of filters that should be run when this set is
77 // processed.
78 private List filters;
79 // the inbound side of this filter set.
80 private Supplier supplier;
81 // the outbound side of this filter set.
82 private Consumer consumer;
83 // Channel for storage of errored records.
84 private Consumer errors;
85 // the default general purpose channel implementation.
86 private Channel defaultChannel;
87 // the outbound side of the last filter added. used
88 // as the inbound side of the next filter.
89 private Channel previousChannel;
90
91 // for reporting fatal errors from a thread.
92 private ExceptionHandler handler;
93
94 /***
95 * Default constructor.
96 *
97 * @since 1.0
98 */
99 public FilterSet() {
100 filters = Collections.synchronizedList(new LinkedList());
101 // for a default, use a concurrency safe, linked list
102 defaultChannel = new ArrayListQueue();
103 }
104
105 /***
106 * Initialize any local resources
107 *
108 * @param p
109 * @throws InitializationException
110 * @since 1.0
111 */
112 public void init(Properties p) throws InitializationException {
113 // nothing to do.
114 }
115
116 /***
117 * Return the name of this Filter.
118 * @return String
119 */
120 public String getName() {
121 return "FilterSet";
122 }
123
124 /***
125 * Add a new filter to the set. New filters are appended to the
126 * end of the list & run in that order.
127 * <br/>
128 * When a new filter is added to the set, a channel is created to
129 * connect the previously final filter to the new addition. When
130 * no channel is provided, a default general purpose channel is
131 * used.
132 *
133 * @param f Filter to be added to collection
134 *
135 * @since 1.0
136 */
137 public void addNode(Filter f) {
138 // use default channel
139 addNode(f, (Channel) defaultChannel.clone());
140 }
141
142 /***
143 * Add a new filter to the set. New filters are appended to the
144 * end of the list & run in that order.
145 * <br/>
146 * An optional <code>Channel</code> parameter can be provided as
147 * the preferred Channel implementation for the outbound side of
148 * this filter.
149 *
150 * @param f Filter to be added to collection
151 * @param c Channel to be used on outbound side of this filter.
152 *
153 * @since 1.0
154 */
155 public void addNode(Filter f, Channel c) {
156 // if it's the first, the process() method
157 // will set things up.
158 if (filters.size() > 0) {
159 f.setInbound(previousChannel);
160 }
161 f.setOutbound(c); // recommended channel impl.
162 previousChannel = c; // inbound side of next filter.
163 filters.add(f); // all set, add it.
164 }
165
166 /***
167 * do filter processing.
168 * @throws ProcessingException
169 */
170 public void process() throws ProcessingException {
171 if ((supplier == null)
172 || (consumer == null)
173 || (filters.size() == 0)) {
174 throw new ProcessingException("filter set not properly configured.");
175 }
176 // setup (can't use getFirst & getLast because
177 ((Filter) filters.get(0)).setInbound(supplier);
178 ((Filter) filters.get(filters.size() - 1)).setOutbound(consumer);
179
180 // run all the child filters.
181 Iterator iter = filters.iterator();
182 Filter f = null;
183 while (iter.hasNext()) {
184 f = (Filter) iter.next();
185 f.setErrorChannel(errors);
186 f.markForDeath();
187 f.process();
188 }
189 }
190
191 /***
192 * markForDeath sets a flag telling the Filter to shutdown at the
193 * next available opportunity. The filterset does not cycle
194 * continuously, so the implementation is a no-op.
195 *
196 * @since 1.1.
197 */
198 public void markForDeath() {
199 // no op
200 }
201
202 /***
203 * Reset the filter so that it's ready to process records again.
204 * For the FilterSet, no reset logic is necessary.
205 *
206 * @since 1.1
207 */
208 public void reset() {
209 // no op
210 }
211
212 /***
213 * Runnable interface
214 */
215 public void run() {
216 try {
217 this.process();
218 }
219 catch (ProcessingException pe) {
220 getExceptionHandler().reportException(pe);
221 }
222 }
223
224 /***
225 * Set the inbound delivery mechanism. The first filter
226 * in the set is hooked up to the input mechanism. (adapter)
227 *
228 * @param s
229 *
230 * @since 1.0
231 */
232 public void setInbound(Supplier s) {
233 this.supplier = s;
234 }
235
236 /***
237 * Set the outbound delivery mechanism. The final filter
238 * in the set is hooked up to the output mechanism. (adapter)
239 *
240 * @param c
241 *
242 * @since 1.0
243 */
244 public void setOutbound(Consumer c) {
245 this.consumer = c;
246 }
247
248 /***
249 * Set the holding bin used for errored records..
250 *
251 * @param err
252 *
253 * @since 1.0
254 */
255 public void setErrorChannel(Consumer err) {
256 this.errors = err;
257 }
258
259 /***
260 * Set the exception handler mechanism.
261 *
262 * @param handler
263 *
264 * @since 1.0
265 */
266 public void setExceptionHandler(ExceptionHandler handler) {
267 this.handler = handler;
268 }
269 /***
270 * @return the exception handler
271 */
272 public ExceptionHandler getExceptionHandler() {
273 return this.handler;
274 }
275
276 /***
277 * Return the suggested number of threads to launch for this
278 * Filter. Obviously this requires that the application uses
279 * a Pipeline that implements multi-threading.
280 * <br/>
281 * The FilterSet is a single threaded, serial processing
282 * composite Filter. If you wish to multi-thread, add the
283 * Filters to the pipe individually and use a MT pipeline
284 * implementation.
285 *
286 * @return 1 - run only 1 thread for this FilterSet.
287 *
288 * @since 1.1
289 */
290 public int numThreads() {
291 return 1;
292 }
293 }
This page was automatically generated by Maven