View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc;
48
49 import com.rhi.architecture.resource.InitializationException;
50
51 import java.util.ArrayList;
52 import java.util.List;
53 import java.util.ListIterator;
54
55 /***
56 * Processing pipeline component. Responsible for running the
57 * filters and pushing the results into the output adapter.
58 *
59 * @author Pete McKinstry
60 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
61 *
62 * @since 1.0
63 */
64 public class ParallelMTPipeline extends AbstractPipeline {
65
66 // tree of all threads that have been launched.
67 // each filter has a sub-node containing a ThreadGroup
68 // for itself.
69 private ThreadGroup root;
70
71 /***
72 * Default Constructor. Defaults Channel
73 * to general purpose implementation
74 *
75 * @since 1.0
76 */
77 public ParallelMTPipeline() {
78 super();
79 }
80
81 /***
82 * initialize the pipeline.
83 * @param ch
84 * @throws InitializationException
85 * @see com.rhi.architecture.parc.Pipeline#init(com.rhi.architecture.parc.Channel)
86 */
87 public void init(Channel ch) throws InitializationException {
88 super.init(ch);
89 }
90
91 /***
92 * Run the filter set.
93 * @since 1.0
94 * @throws ProcessingException
95 */
96 public void process() throws ProcessingException {
97
98 log().info("running parallel pipeline.");
99
100 try {
101 List filters = getFilters();
102 if (filters.size() == 0) {
103 log().debug("no filters, pipeline will pass records through.");
104 return;
105 }
106
107 ListIterator iter = filters.listIterator();
108 root = new ThreadGroup("filters");
109 // I don't use ThreadGroup.enumerate( ThreadGroup[] )
110 // because i'm afraid it won't enforce the ordering of
111 // the thread groups and i'm counting on the order
112 // being the creation order.
113 ArrayList thGrps = new ArrayList();
114
115 // for each filter, launch a set of threads.
116 while (iter.hasNext()) {
117 Filter f = (Filter) iter.next();
118 // reset Filter before launching. clears shutdown flag.
119 f.reset();
120 f.setExceptionHandler(getExceptionHandler());
121 // thread group name = filter name
122 ThreadGroup grp = new ThreadGroup(root, f.getName());
123 thGrps.add(grp);
124 int thread_count = (f.numThreads() > 0) ? f.numThreads() : 1;
125 for (int i = 0; i < thread_count; ++i) {
126 Thread filterTh =
127 new Thread(
128 grp,
129 f,
130 f.getName() + Integer.toString(i) /* thread name */
131 );
132 //filterTh.setPriority( Thread.NORM_PRIORITY );
133 filterTh.setDaemon(true); // for fatal error handling.
134 filterTh.start();
135 }
136 }
137
138 // Shutdown each thread group (Filter) as work is completed.
139 ThreadGroup pendingGroup = null;
140 Filter pendingFilter = null;
141 // For each filter (pipeline order: first in pipe -> last in pipe)
142 // 2) Call markForDeath. Threads will shutdown as they
143 // finish their work. (empty cycle's)
144 // 3) Wait for all threads in this Filter's ThreadGroup to die.
145 // 4) Repeat for the next filter
146 ListIterator filterIterator = filters.listIterator();
147 ListIterator threadGroupIterator = thGrps.listIterator();
148 // the 2 iterators should have the same number of elements &
149 // should stay in synch.
150 while (filterIterator.hasNext()
151 && threadGroupIterator.hasNext()) {
152 pendingFilter = (Filter) filterIterator.next();
153 pendingFilter.markForDeath();
154 pendingGroup = (ThreadGroup) threadGroupIterator.next();
155
156 // wait for all Threads in this group to shutdown.
157 while ((pendingGroup.activeCount() > 0)
158 && (getExceptionHandler().hasError() == false)) {
159 try {
160 // i could also use Thread.yield(), but i like more
161 // control over the length of time. In this case, sleeping
162 // for a while is good.
163 Thread.yield(); // sleep(1000);
164 }
165 //catch (InterruptedException ie) { }
166 finally {
167 // ignore.
168 }
169 }
170 if (getExceptionHandler().hasError()) {
171 // Failure occurred, so shutdown any other filters.
172 stopFilters(root);
173 getExceptionHandler().rethrowException();
174 }
175 log().debug(
176 "ThreadGroup <"
177 + pendingGroup.getName()
178 + "> dead, next... ");
179 }
180
181 if (root.activeCount() > 0) {
182 log().error("All threads should have already been shutdown.");
183 throw new ProcessingException("Bug in thread control logic.");
184 }
185
186 } // try
187 finally {
188 log().info("pipeline complete.");
189 }
190 }
191
192 /***
193 * Cleanup in-process work and shutdown queues.
194 *
195 * @since 1.0
196 */
197 public void flush() {
198 log().info("pipeline shutting down.");
199
200 // need to check for outstanding threads
201 while ((root != null) && (root.activeCount() > 0)) {
202 // Don't be as nice during flush() because i'm trying to exit.
203 Thread.yield();
204 }
205 }
206
207 }
This page was automatically generated by Maven