View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc;
48
49 import com.rhi.architecture.logging.LogUtil;
50 import com.rhi.architecture.logging.Logger;
51 import com.rhi.architecture.config.ConfigurationException;
52 import com.rhi.architecture.resource.InitializationException;
53
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.ListIterator;
57
58 /***
59 * Abstract Pipeline class. Implements the basics. The process()
60 * method needs to be implemented by the concrete class.
61 *
62 * @author Pete McKinstry
63 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
64 *
65 * @since 1.0
66 */
67 public abstract class AbstractPipeline implements Pipeline {
68
69 // list of filters to be run by this pipeline.
70 private ArrayList filters = new ArrayList();
71 // the default general purpose channel implementation.
72 private Channel defaultChannel;
73 // the input channel for the pipeline.
74 private Channel inputChannel;
75 // the output channel for the pipeline.
76 // (changes as the pipeline is configured.)
77 private Channel finalChannel;
78 // the error channel for the pipeline.
79 private Channel errorChannel;
80
81 private ExceptionHandler handler;
82
83 private Logger log = null;
84
85 /***
86 * Constructor. takes single parameter equal to the
87 * number of threads of each filter. Defaults Channel
88 * to general purpose implementation
89 *
90 * @since 1.0
91 */
92 public AbstractPipeline() {
93 // no op
94 }
95
96 /***
97 * Set the default channel
98 *
99 * @param ch
100 * @throws InitializationException
101 * @since 1.1
102 */
103 public void init(Channel ch) throws InitializationException {
104 // initialize the logger. (first opportunity)
105 try {
106 log = LogUtil.getLogger();
107 if (log == null) {
108 throw new ConfigurationException("Logger initialization failed.");
109 }
110 }
111 catch (ConfigurationException e) {
112 throw new InitializationException("Unable to create logger.", e);
113 }
114
115 defaultChannel = ch;
116 inputChannel = (Channel) defaultChannel.clone();
117 errorChannel = (Channel) defaultChannel.clone();
118 }
119
120 /***
121 * Return the error channel for the pipeline. This can
122 * be used by every Filter for records that are errored,
123 * so that they don't clog up the main processing
124 * channels.
125 *
126 * @param handler - the exception handler to be used when reporting
127 * exceptions asynchronously.
128 *
129 * @since 1.0
130 */
131 public void setExceptionHandler(ExceptionHandler handler) {
132 this.handler = handler;
133 }
134 /***
135 * Returns the handler.
136 * @return ExceptionHandler
137 */
138 public ExceptionHandler getExceptionHandler() {
139 if (handler == null) {
140 this.handler = new ExceptionHandler();
141 }
142 return this.handler;
143 }
144
145 /***
146 * Return the inbound side of this pipeline. This is
147 * used to connect the input adapter to the pipeline.
148 *
149 * @return Channel - the input channel
150 *
151 * @since 1.0
152 */
153 public Channel getInputChannel() {
154 return this.inputChannel;
155 }
156
157 /***
158 * Return the outbound side of this pipeline. This is
159 * used to connect the output adapter to the pipeline.
160 *
161 * @return Channel - the outbound channel
162 *
163 * @since 1.0
164 */
165 public Channel getOutputChannel() {
166 return this.finalChannel != null
167 ? this.finalChannel
168 : this.inputChannel;
169 }
170
171 /***
172 * Return the error channel for the pipeline. This can
173 * be used by every Filter for records that are errored,
174 * so that they don't clog up the main processing
175 * channels.
176 *
177 * @return Channel - the error channel
178 *
179 * @since 1.0
180 */
181 public Channel getErrorChannel() {
182 return this.errorChannel;
183 }
184
185 /***
186 * Return the filters.
187 *
188 * @return List - Filters
189 *
190 * @since 1.0
191 */
192 protected List getFilters() {
193 return this.filters;
194 }
195
196 /***
197 * Add a new filter to the set. New filters are appended to the
198 * end of the list & run in that order.
199 * <br/>
200 * When a new filter is added to the set, a channel is created to
201 * connect the previously final filter to the new addition. The
202 * new filter is then connected to output adapter. (end of the line)
203 * When no channel is provided, a default general purpose channel is
204 * used.
205 *
206 * @param f the Filter to be added to collection
207 *
208 * @since 1.0
209 */
210 public void addFilter(Filter f) {
211 // use default channel
212 addFilter(f, (Channel) defaultChannel.clone());
213 }
214
215 /***
216 * Add a new filter to the set. New filters are appended to the
217 * end of the list & run in that order.
218 * <br/>
219 * An optional <code>Channel</code> parameter can be provided as
220 * the preferred Channel implementation for the outbound side of
221 * this filter.
222 *
223 * @param f The Filter to be added to collection
224 * @param c The Channel to be used on outbound side of
225 * this filter.
226 *
227 * @since 1.0
228 */
229 public void addFilter(Filter f, Channel c) {
230 // if it's the first, the process() method
231 // will set things up.
232 if (filters.size() > 0) {
233 f.setInbound(finalChannel);
234 }
235 else {
236 f.setInbound(inputChannel);
237 }
238 f.setOutbound(c); // recommended channel impl.
239 f.setErrorChannel(getErrorChannel()); // for errors.
240 finalChannel = c; // inbound side of next filter.
241 filters.add(f); // all set, add it.
242 }
243
244 /***
245 * Useful in clean shutdown when an error condition occured. Called from a
246 * concrete Pipeline.
247 *
248 * @param root - ThreadGroup root
249 */
250 protected void stopFilters(ThreadGroup root) {
251 ListIterator shutdownIterator = getFilters().listIterator();
252 while (shutdownIterator.hasNext()) {
253 Filter f = (Filter) shutdownIterator.next();
254 f.markForDeath();
255 }
256 // wait for them to exit.
257 while (root.activeCount() > 0) {
258 try {
259 Thread.sleep(500);
260 }
261 catch (InterruptedException e) {
262 // ignore
263 }
264 finally {
265 // empty
266 }
267 }
268 }
269
270 protected Logger log() {
271 return this.log;
272 }
273
274 }
This page was automatically generated by Maven