View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc.filter; 48 49 import com.rhi.architecture.parc.ArrayListQueue; 50 import com.rhi.architecture.parc.Channel; 51 import com.rhi.architecture.parc.Consumer; 52 import com.rhi.architecture.parc.ExceptionHandler; 53 import com.rhi.architecture.parc.Filter; 54 import com.rhi.architecture.parc.ProcessingException; 55 import com.rhi.architecture.parc.Supplier; 56 import com.rhi.architecture.resource.InitializationException; 57 58 import java.util.Collections; 59 import java.util.Iterator; 60 import java.util.LinkedList; 61 import java.util.List; 62 import java.util.Properties; 63 64 /*** 65 * FilterSet. Composite element of GOF pattern. 66 * Note: Since it's just a container, it has been marked as 67 * final. No sub-classing should be required. 68 * 69 * @author Pete McKinstry 70 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 71 * 72 * @since 1.0 73 */ 74 public final class FilterSet implements Filter { 75 76 // the list of filters that should be run when this set is 77 // processed. 78 private List filters; 79 // the inbound side of this filter set. 80 private Supplier supplier; 81 // the outbound side of this filter set. 82 private Consumer consumer; 83 // Channel for storage of errored records. 84 private Consumer errors; 85 // the default general purpose channel implementation. 86 private Channel defaultChannel; 87 // the outbound side of the last filter added. used 88 // as the inbound side of the next filter. 89 private Channel previousChannel; 90 91 // for reporting fatal errors from a thread. 92 private ExceptionHandler handler; 93 94 /*** 95 * Default constructor. 96 * 97 * @since 1.0 98 */ 99 public FilterSet() { 100 filters = Collections.synchronizedList(new LinkedList()); 101 // for a default, use a concurrency safe, linked list 102 defaultChannel = new ArrayListQueue(); 103 } 104 105 /*** 106 * Initialize any local resources 107 * 108 * @param p 109 * @throws InitializationException 110 * @since 1.0 111 */ 112 public void init(Properties p) throws InitializationException { 113 // nothing to do. 114 } 115 116 /*** 117 * Return the name of this Filter. 118 * @return String 119 */ 120 public String getName() { 121 return "FilterSet"; 122 } 123 124 /*** 125 * Add a new filter to the set. New filters are appended to the 126 * end of the list & run in that order. 127 * <br/> 128 * When a new filter is added to the set, a channel is created to 129 * connect the previously final filter to the new addition. When 130 * no channel is provided, a default general purpose channel is 131 * used. 132 * 133 * @param f Filter to be added to collection 134 * 135 * @since 1.0 136 */ 137 public void addNode(Filter f) { 138 // use default channel 139 addNode(f, (Channel) defaultChannel.clone()); 140 } 141 142 /*** 143 * Add a new filter to the set. New filters are appended to the 144 * end of the list & run in that order. 145 * <br/> 146 * An optional <code>Channel</code> parameter can be provided as 147 * the preferred Channel implementation for the outbound side of 148 * this filter. 149 * 150 * @param f Filter to be added to collection 151 * @param c Channel to be used on outbound side of this filter. 152 * 153 * @since 1.0 154 */ 155 public void addNode(Filter f, Channel c) { 156 // if it's the first, the process() method 157 // will set things up. 158 if (filters.size() > 0) { 159 f.setInbound(previousChannel); 160 } 161 f.setOutbound(c); // recommended channel impl. 162 previousChannel = c; // inbound side of next filter. 163 filters.add(f); // all set, add it. 164 } 165 166 /*** 167 * do filter processing. 168 * @throws ProcessingException 169 */ 170 public void process() throws ProcessingException { 171 if ((supplier == null) 172 || (consumer == null) 173 || (filters.size() == 0)) { 174 throw new ProcessingException("filter set not properly configured."); 175 } 176 // setup (can't use getFirst & getLast because 177 ((Filter) filters.get(0)).setInbound(supplier); 178 ((Filter) filters.get(filters.size() - 1)).setOutbound(consumer); 179 180 // run all the child filters. 181 Iterator iter = filters.iterator(); 182 Filter f = null; 183 while (iter.hasNext()) { 184 f = (Filter) iter.next(); 185 f.setErrorChannel(errors); 186 f.markForDeath(); 187 f.process(); 188 } 189 } 190 191 /*** 192 * markForDeath sets a flag telling the Filter to shutdown at the 193 * next available opportunity. The filterset does not cycle 194 * continuously, so the implementation is a no-op. 195 * 196 * @since 1.1. 197 */ 198 public void markForDeath() { 199 // no op 200 } 201 202 /*** 203 * Reset the filter so that it's ready to process records again. 204 * For the FilterSet, no reset logic is necessary. 205 * 206 * @since 1.1 207 */ 208 public void reset() { 209 // no op 210 } 211 212 /*** 213 * Runnable interface 214 */ 215 public void run() { 216 try { 217 this.process(); 218 } 219 catch (ProcessingException pe) { 220 getExceptionHandler().reportException(pe); 221 } 222 } 223 224 /*** 225 * Set the inbound delivery mechanism. The first filter 226 * in the set is hooked up to the input mechanism. (adapter) 227 * 228 * @param s 229 * 230 * @since 1.0 231 */ 232 public void setInbound(Supplier s) { 233 this.supplier = s; 234 } 235 236 /*** 237 * Set the outbound delivery mechanism. The final filter 238 * in the set is hooked up to the output mechanism. (adapter) 239 * 240 * @param c 241 * 242 * @since 1.0 243 */ 244 public void setOutbound(Consumer c) { 245 this.consumer = c; 246 } 247 248 /*** 249 * Set the holding bin used for errored records.. 250 * 251 * @param err 252 * 253 * @since 1.0 254 */ 255 public void setErrorChannel(Consumer err) { 256 this.errors = err; 257 } 258 259 /*** 260 * Set the exception handler mechanism. 261 * 262 * @param handler 263 * 264 * @since 1.0 265 */ 266 public void setExceptionHandler(ExceptionHandler handler) { 267 this.handler = handler; 268 } 269 /*** 270 * @return the exception handler 271 */ 272 public ExceptionHandler getExceptionHandler() { 273 return this.handler; 274 } 275 276 /*** 277 * Return the suggested number of threads to launch for this 278 * Filter. Obviously this requires that the application uses 279 * a Pipeline that implements multi-threading. 280 * <br/> 281 * The FilterSet is a single threaded, serial processing 282 * composite Filter. If you wish to multi-thread, add the 283 * Filters to the pipe individually and use a MT pipeline 284 * implementation. 285 * 286 * @return 1 - run only 1 thread for this FilterSet. 287 * 288 * @since 1.1 289 */ 290 public int numThreads() { 291 return 1; 292 } 293 }

This page was automatically generated by Maven