View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc; 48 49 import com.rhi.architecture.logging.LogUtil; 50 import com.rhi.architecture.logging.Logger; 51 import com.rhi.architecture.config.ConfigurationException; 52 import com.rhi.architecture.resource.InitializationException; 53 54 import java.util.ArrayList; 55 import java.util.List; 56 import java.util.ListIterator; 57 58 /*** 59 * Abstract Pipeline class. Implements the basics. The process() 60 * method needs to be implemented by the concrete class. 61 * 62 * @author Pete McKinstry 63 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 64 * 65 * @since 1.0 66 */ 67 public abstract class AbstractPipeline implements Pipeline { 68 69 // list of filters to be run by this pipeline. 70 private ArrayList filters = new ArrayList(); 71 // the default general purpose channel implementation. 72 private Channel defaultChannel; 73 // the input channel for the pipeline. 74 private Channel inputChannel; 75 // the output channel for the pipeline. 76 // (changes as the pipeline is configured.) 77 private Channel finalChannel; 78 // the error channel for the pipeline. 79 private Channel errorChannel; 80 81 private ExceptionHandler handler; 82 83 private Logger log = null; 84 85 /*** 86 * Constructor. takes single parameter equal to the 87 * number of threads of each filter. Defaults Channel 88 * to general purpose implementation 89 * 90 * @since 1.0 91 */ 92 public AbstractPipeline() { 93 // no op 94 } 95 96 /*** 97 * Set the default channel 98 * 99 * @param ch 100 * @throws InitializationException 101 * @since 1.1 102 */ 103 public void init(Channel ch) throws InitializationException { 104 // initialize the logger. (first opportunity) 105 try { 106 log = LogUtil.getLogger(); 107 if (log == null) { 108 throw new ConfigurationException("Logger initialization failed."); 109 } 110 } 111 catch (ConfigurationException e) { 112 throw new InitializationException("Unable to create logger.", e); 113 } 114 115 defaultChannel = ch; 116 inputChannel = (Channel) defaultChannel.clone(); 117 errorChannel = (Channel) defaultChannel.clone(); 118 } 119 120 /*** 121 * Return the error channel for the pipeline. This can 122 * be used by every Filter for records that are errored, 123 * so that they don't clog up the main processing 124 * channels. 125 * 126 * @param handler - the exception handler to be used when reporting 127 * exceptions asynchronously. 128 * 129 * @since 1.0 130 */ 131 public void setExceptionHandler(ExceptionHandler handler) { 132 this.handler = handler; 133 } 134 /*** 135 * Returns the handler. 136 * @return ExceptionHandler 137 */ 138 public ExceptionHandler getExceptionHandler() { 139 if (handler == null) { 140 this.handler = new ExceptionHandler(); 141 } 142 return this.handler; 143 } 144 145 /*** 146 * Return the inbound side of this pipeline. This is 147 * used to connect the input adapter to the pipeline. 148 * 149 * @return Channel - the input channel 150 * 151 * @since 1.0 152 */ 153 public Channel getInputChannel() { 154 return this.inputChannel; 155 } 156 157 /*** 158 * Return the outbound side of this pipeline. This is 159 * used to connect the output adapter to the pipeline. 160 * 161 * @return Channel - the outbound channel 162 * 163 * @since 1.0 164 */ 165 public Channel getOutputChannel() { 166 return this.finalChannel != null 167 ? this.finalChannel 168 : this.inputChannel; 169 } 170 171 /*** 172 * Return the error channel for the pipeline. This can 173 * be used by every Filter for records that are errored, 174 * so that they don't clog up the main processing 175 * channels. 176 * 177 * @return Channel - the error channel 178 * 179 * @since 1.0 180 */ 181 public Channel getErrorChannel() { 182 return this.errorChannel; 183 } 184 185 /*** 186 * Return the filters. 187 * 188 * @return List - Filters 189 * 190 * @since 1.0 191 */ 192 protected List getFilters() { 193 return this.filters; 194 } 195 196 /*** 197 * Add a new filter to the set. New filters are appended to the 198 * end of the list & run in that order. 199 * <br/> 200 * When a new filter is added to the set, a channel is created to 201 * connect the previously final filter to the new addition. The 202 * new filter is then connected to output adapter. (end of the line) 203 * When no channel is provided, a default general purpose channel is 204 * used. 205 * 206 * @param f the Filter to be added to collection 207 * 208 * @since 1.0 209 */ 210 public void addFilter(Filter f) { 211 // use default channel 212 addFilter(f, (Channel) defaultChannel.clone()); 213 } 214 215 /*** 216 * Add a new filter to the set. New filters are appended to the 217 * end of the list & run in that order. 218 * <br/> 219 * An optional <code>Channel</code> parameter can be provided as 220 * the preferred Channel implementation for the outbound side of 221 * this filter. 222 * 223 * @param f The Filter to be added to collection 224 * @param c The Channel to be used on outbound side of 225 * this filter. 226 * 227 * @since 1.0 228 */ 229 public void addFilter(Filter f, Channel c) { 230 // if it's the first, the process() method 231 // will set things up. 232 if (filters.size() > 0) { 233 f.setInbound(finalChannel); 234 } 235 else { 236 f.setInbound(inputChannel); 237 } 238 f.setOutbound(c); // recommended channel impl. 239 f.setErrorChannel(getErrorChannel()); // for errors. 240 finalChannel = c; // inbound side of next filter. 241 filters.add(f); // all set, add it. 242 } 243 244 /*** 245 * Useful in clean shutdown when an error condition occured. Called from a 246 * concrete Pipeline. 247 * 248 * @param root - ThreadGroup root 249 */ 250 protected void stopFilters(ThreadGroup root) { 251 ListIterator shutdownIterator = getFilters().listIterator(); 252 while (shutdownIterator.hasNext()) { 253 Filter f = (Filter) shutdownIterator.next(); 254 f.markForDeath(); 255 } 256 // wait for them to exit. 257 while (root.activeCount() > 0) { 258 try { 259 Thread.sleep(500); 260 } 261 catch (InterruptedException e) { 262 // ignore 263 } 264 finally { 265 // empty 266 } 267 } 268 } 269 270 protected Logger log() { 271 return this.log; 272 } 273 274 }

This page was automatically generated by Maven