View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.parc.adapter; 48 49 import com.rhi.architecture.batch.AuditAgent; 50 import com.rhi.architecture.batch.AuditException; 51 import com.rhi.architecture.config.ConfigurationException; 52 import com.rhi.architecture.logging.LogUtil; 53 import com.rhi.architecture.logging.Logger; 54 import com.rhi.architecture.parc.Channel; 55 import com.rhi.architecture.parc.InputAdapter; 56 import com.rhi.architecture.parc.ProcessingException; 57 import com.rhi.architecture.parc.Record; 58 import com.rhi.architecture.resource.InitializationException; 59 60 import java.util.ArrayList; 61 import java.util.Collection; 62 import java.util.Properties; 63 64 /*** 65 * The InputAdapter is responsible for creating the work set 66 * that the pipeline will execute on. Common implementations 67 * will load records from either a file 68 * or from the database. 69 * 70 * @author Pete McKinstry 71 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 72 * 73 * @since 1.0 74 */ 75 public abstract class AbstractInputAdapter implements InputAdapter { 76 77 // for normal logging, use the log category. 78 private static Logger log = null; 79 80 private AuditAgent auditAgent; 81 private Channel errorChannel; 82 private Collection errors; 83 84 /*** 85 * Default contstructor 86 * 87 * @since 1.0 88 */ 89 public AbstractInputAdapter() { 90 this.errors = new ArrayList(); 91 } 92 93 /*** 94 * Set method to tell the InputAdapter what audit 95 * agent to use. The input Adapter creates the initial 96 * audit record w/ the # of records being processed. 97 * 98 * @param agent 99 * 100 * @since 1.0 101 */ 102 public final void setAuditAgent(AuditAgent agent) { 103 this.auditAgent = agent; 104 } 105 /*** 106 * get audit agent 107 * @return AuditAgent 108 * 109 * @since 1.0 110 */ 111 public final AuditAgent getAuditAgent() { 112 return auditAgent; 113 } 114 115 /*** 116 * No-op init() method. Meant to be overriden if 117 * necessary. 118 * 119 * @param props 120 * @throws InitializationException 121 * @since 1.0 122 */ 123 public void init(Properties props) throws InitializationException { 124 try { 125 log = LogUtil.getLogger(); 126 } 127 catch (ConfigurationException e) { 128 throw new InitializationException(e.toString(), e); 129 } 130 } 131 132 /*** 133 * No-op cleanup method. Meant to be overriden if 134 * necessary. 135 * 136 * @since 1.0 137 */ 138 public void cleanup() { 139 // no op 140 } 141 142 /*** 143 * Push a set of records into the pipeline. 144 * 145 * @param validChannel inbound side of the pipe. 146 * @return int number of records processed. 147 * @throws ProcessingException 148 * @since 1.0 149 */ 150 public int push(Channel validChannel) throws ProcessingException { 151 152 // load records 153 Collection valid = loadBatch(); 154 155 Collection errors = getErrors(); 156 157 Collection all = new ArrayList(); 158 all.addAll(valid); 159 all.addAll(errors); 160 161 int size = all.size(); 162 if (size <= 0) { 163 log.debug("no records to process."); 164 return size; 165 } 166 167 try { 168 // create an audit record w/ the correct counts. 169 auditAgent.createAudits(all); 170 } 171 catch (AuditException ae) { 172 log.error("AuditException thrown by createAudits()", ae); 173 throw new ProcessingException( 174 "AuditException in createAudits", 175 ae); 176 } 177 178 // push the records into the channel. 179 errorChannel.push(errors); 180 validChannel.push(valid); 181 errors.clear(); 182 return size; 183 } 184 185 /*** 186 * Retrieve the batch. 187 * 188 * @return Collection - the records bound for the channel. 189 * @throws ProcessingException 190 * @since 1.0 191 */ 192 protected abstract Collection loadBatch() throws ProcessingException; 193 194 /*** 195 * set the error channel. 196 * <p/> 197 * This method can be used by input adapters in cases where they 198 * have parsing errors or other non-fatal record level errors 199 * when loading a batch of records. 200 * @param errorChannel 201 * @since 1.0 202 */ 203 public void setErrorChannel(Channel errorChannel) { 204 this.errorChannel = errorChannel; 205 } 206 207 /*** 208 * get errors 209 * @return Collection 210 * @since 1.1 211 */ 212 protected Collection getErrors() { 213 return this.errors; 214 } 215 216 /*** 217 * The concrete input adapter can use this method to add 218 * error records to the error channel while maintaining 219 * the correct audit entries for valid (returned from 220 * loadBatch() & pushed into the normal pipeline channel) 221 * & errors (added by calling addError() & pushed into 222 * the error channel. 223 * @param record 224 * @since 1.0 225 */ 226 public final void addToErrorChannel(Record record) { 227 getErrors().add(record); 228 } 229 230 }

This page was automatically generated by Maven