View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc; 48 49 import com.rhi.architecture.resource.InitializationException; 50 51 import java.util.ArrayList; 52 import java.util.List; 53 import java.util.ListIterator; 54 55 /*** 56 * Processing pipeline component. Responsible for running the 57 * filters and pushing the results into the output adapter. 58 * 59 * @author Pete McKinstry 60 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 61 * 62 * @since 1.0 63 */ 64 public class ParallelMTPipeline extends AbstractPipeline { 65 66 // tree of all threads that have been launched. 67 // each filter has a sub-node containing a ThreadGroup 68 // for itself. 69 private ThreadGroup root; 70 71 /*** 72 * Default Constructor. Defaults Channel 73 * to general purpose implementation 74 * 75 * @since 1.0 76 */ 77 public ParallelMTPipeline() { 78 super(); 79 } 80 81 /*** 82 * initialize the pipeline. 83 * @param ch 84 * @throws InitializationException 85 * @see com.rhi.architecture.parc.Pipeline#init(com.rhi.architecture.parc.Channel) 86 */ 87 public void init(Channel ch) throws InitializationException { 88 super.init(ch); 89 } 90 91 /*** 92 * Run the filter set. 93 * @since 1.0 94 * @throws ProcessingException 95 */ 96 public void process() throws ProcessingException { 97 98 log().info("running parallel pipeline."); 99 100 try { 101 List filters = getFilters(); 102 if (filters.size() == 0) { 103 log().debug("no filters, pipeline will pass records through."); 104 return; 105 } 106 107 ListIterator iter = filters.listIterator(); 108 root = new ThreadGroup("filters"); 109 // I don't use ThreadGroup.enumerate( ThreadGroup[] ) 110 // because i'm afraid it won't enforce the ordering of 111 // the thread groups and i'm counting on the order 112 // being the creation order. 113 ArrayList thGrps = new ArrayList(); 114 115 // for each filter, launch a set of threads. 116 while (iter.hasNext()) { 117 Filter f = (Filter) iter.next(); 118 // reset Filter before launching. clears shutdown flag. 119 f.reset(); 120 f.setExceptionHandler(getExceptionHandler()); 121 // thread group name = filter name 122 ThreadGroup grp = new ThreadGroup(root, f.getName()); 123 thGrps.add(grp); 124 int thread_count = (f.numThreads() > 0) ? f.numThreads() : 1; 125 for (int i = 0; i < thread_count; ++i) { 126 Thread filterTh = 127 new Thread( 128 grp, 129 f, 130 f.getName() + Integer.toString(i) /* thread name */ 131 ); 132 //filterTh.setPriority( Thread.NORM_PRIORITY ); 133 filterTh.setDaemon(true); // for fatal error handling. 134 filterTh.start(); 135 } 136 } 137 138 // Shutdown each thread group (Filter) as work is completed. 139 ThreadGroup pendingGroup = null; 140 Filter pendingFilter = null; 141 // For each filter (pipeline order: first in pipe -> last in pipe) 142 // 2) Call markForDeath. Threads will shutdown as they 143 // finish their work. (empty cycle's) 144 // 3) Wait for all threads in this Filter's ThreadGroup to die. 145 // 4) Repeat for the next filter 146 ListIterator filterIterator = filters.listIterator(); 147 ListIterator threadGroupIterator = thGrps.listIterator(); 148 // the 2 iterators should have the same number of elements & 149 // should stay in synch. 150 while (filterIterator.hasNext() 151 && threadGroupIterator.hasNext()) { 152 pendingFilter = (Filter) filterIterator.next(); 153 pendingFilter.markForDeath(); 154 pendingGroup = (ThreadGroup) threadGroupIterator.next(); 155 156 // wait for all Threads in this group to shutdown. 157 while ((pendingGroup.activeCount() > 0) 158 && (getExceptionHandler().hasError() == false)) { 159 try { 160 // i could also use Thread.yield(), but i like more 161 // control over the length of time. In this case, sleeping 162 // for a while is good. 163 Thread.yield(); // sleep(1000); 164 } 165 //catch (InterruptedException ie) { } 166 finally { 167 // ignore. 168 } 169 } 170 if (getExceptionHandler().hasError()) { 171 // Failure occurred, so shutdown any other filters. 172 stopFilters(root); 173 getExceptionHandler().rethrowException(); 174 } 175 log().debug( 176 "ThreadGroup <" 177 + pendingGroup.getName() 178 + "> dead, next... "); 179 } 180 181 if (root.activeCount() > 0) { 182 log().error("All threads should have already been shutdown."); 183 throw new ProcessingException("Bug in thread control logic."); 184 } 185 186 } // try 187 finally { 188 log().info("pipeline complete."); 189 } 190 } 191 192 /*** 193 * Cleanup in-process work and shutdown queues. 194 * 195 * @since 1.0 196 */ 197 public void flush() { 198 log().info("pipeline shutting down."); 199 200 // need to check for outstanding threads 201 while ((root != null) && (root.activeCount() > 0)) { 202 // Don't be as nice during flush() because i'm trying to exit. 203 Thread.yield(); 204 } 205 } 206 207 }

This page was automatically generated by Maven