View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc.filter; 48 49 import com.rhi.architecture.parc.ProcessingException; 50 import com.rhi.architecture.resource.InitializationException; 51 52 import java.util.Collection; 53 import java.util.Properties; 54 55 /*** 56 * PollingFilter.java 57 * 58 * @author <a href="mailto:pete.mckinstry@rhi.com">Pete McKinstry</a> 59 * @copyright 2002, Robert Half Int'l. All rights reserved. 60 * 61 * @version 1.0 62 */ 63 public abstract class PollingFilter extends AbstractFilter { 64 65 /*** 66 * time to sleep between polling. 67 */ 68 public static final String CYCLE_SLEEP_TIME_FLAG = 69 "PollingFilter.cycleSleepTime"; 70 /*** 71 * default sleep time between polls 72 */ 73 public static final String DEFAULT_SLEEP_TIME = "1000"; // default 74 private int sleepTime; 75 76 /*** 77 * Constructor for PollingFilter. 78 */ 79 public PollingFilter() { 80 super(); 81 } 82 83 /*** 84 * Constructor for PollingFilter. 85 * @param max 86 */ 87 public PollingFilter(int max) { 88 super(max); 89 } 90 91 /*** 92 * @see com.rhi.architecture.parc.Filter#init(Properties) 93 * @param p 94 * @throws InitializationException 95 */ 96 public void init(Properties p) throws InitializationException { 97 super.init(p); 98 99 String sleepTimeStr = 100 p.getProperty(CYCLE_SLEEP_TIME_FLAG, DEFAULT_SLEEP_TIME); 101 try { 102 sleepTime = Integer.parseInt(sleepTimeStr); 103 } 104 catch (NumberFormatException e) { 105 throw new InitializationException( 106 CYCLE_SLEEP_TIME_FLAG 107 + " provided, but is not a valid integer."); 108 } 109 } 110 111 /*** 112 * Push a collection of records through the filter. 113 * <br /> 114 * Details: The NotificationFilter processes records w/in 115 * a dependent loop. The exit criteria for the loop 116 * is the markForDeath() flag which should be set by 117 * the pipeline, _and_ an empty processing cycle. This 118 * allows the Strategy to flush the Pipeline w/o knowing 119 * much about the inner workings of the Filter. While 120 * this criteria is false, the Filter does this: 121 * (pseudo-code) 122 * <code> 123 * InboundChannel.pull(); 124 * doWork() <abstract> 125 * OutboundChannel.push(); 126 * </code> 127 * <p/> 128 * This method must be threadsafe as it is the 129 * fundamental multi-processing hook in the framework. 130 * @throws ProcessingException 131 * @since 1.0 132 */ 133 public void process() throws ProcessingException { 134 int recordCount = 0; 135 long processingTime = 0; 136 while (true) { 137 Collection in = getInbound().pull(getMaxRecords()); 138 int size = in.size(); 139 if (size > 0) { 140 if (log().isDebugEnabled()) { 141 log().debug( 142 "filer::process() cycle detected for " 143 + "Filter: <" 144 + getName() 145 + ">, " 146 + "thread = " 147 + Thread.currentThread().getName()); 148 } 149 recordCount += in.size(); 150 long startTime = System.currentTimeMillis(); 151 Collection out = doWork(in); 152 getOutbound().push(out); 153 long endTime = System.currentTimeMillis(); 154 processingTime += (endTime - startTime); 155 } 156 else { 157 log().debug( 158 "filter::process() idle cycle for " 159 + "Filter: <" 160 + getName() 161 + ">, " 162 + "thread=" 163 + Thread.currentThread().getName()); 164 // Marked for death _&_ hit an idle cycle, quit. 165 if (isShutdown() == true) { 166 break; 167 } 168 // If not marked for death, wait for notification from the 169 // suppler that new records are available for processing. 170 try { 171 Thread.sleep(sleepTime); 172 } 173 catch (InterruptedException e) { 174 // ignore 175 } 176 } // else 177 } // while loop 178 179 if (stats().isInfoEnabled()) { 180 stats().info( 181 "Filter <" 182 + getName() 183 + "> processed " 184 + recordCount 185 + " records in " 186 + processingTime 187 + " milliseconds"); 188 } 189 return; 190 } 191 192 }

This page was automatically generated by Maven