View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.parc.adapter;
48
49 import com.rhi.architecture.batch.AuditAgent;
50 import com.rhi.architecture.batch.AuditException;
51 import com.rhi.architecture.config.ConfigurationException;
52 import com.rhi.architecture.logging.LogUtil;
53 import com.rhi.architecture.logging.Logger;
54 import com.rhi.architecture.parc.Channel;
55 import com.rhi.architecture.parc.InputAdapter;
56 import com.rhi.architecture.parc.ProcessingException;
57 import com.rhi.architecture.parc.Record;
58 import com.rhi.architecture.resource.InitializationException;
59
60 import java.util.ArrayList;
61 import java.util.Collection;
62 import java.util.Properties;
63
64 /***
65 * The InputAdapter is responsible for creating the work set
66 * that the pipeline will execute on. Common implementations
67 * will load records from either a file
68 * or from the database.
69 *
70 * @author Pete McKinstry
71 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
72 *
73 * @since 1.0
74 */
75 public abstract class AbstractInputAdapter implements InputAdapter {
76
77 // for normal logging, use the log category.
78 private static Logger log = null;
79
80 private AuditAgent auditAgent;
81 private Channel errorChannel;
82 private Collection errors;
83
84 /***
85 * Default contstructor
86 *
87 * @since 1.0
88 */
89 public AbstractInputAdapter() {
90 this.errors = new ArrayList();
91 }
92
93 /***
94 * Set method to tell the InputAdapter what audit
95 * agent to use. The input Adapter creates the initial
96 * audit record w/ the # of records being processed.
97 *
98 * @param agent
99 *
100 * @since 1.0
101 */
102 public final void setAuditAgent(AuditAgent agent) {
103 this.auditAgent = agent;
104 }
105 /***
106 * get audit agent
107 * @return AuditAgent
108 *
109 * @since 1.0
110 */
111 public final AuditAgent getAuditAgent() {
112 return auditAgent;
113 }
114
115 /***
116 * No-op init() method. Meant to be overriden if
117 * necessary.
118 *
119 * @param props
120 * @throws InitializationException
121 * @since 1.0
122 */
123 public void init(Properties props) throws InitializationException {
124 try {
125 log = LogUtil.getLogger();
126 }
127 catch (ConfigurationException e) {
128 throw new InitializationException(e.toString(), e);
129 }
130 }
131
132 /***
133 * No-op cleanup method. Meant to be overriden if
134 * necessary.
135 *
136 * @since 1.0
137 */
138 public void cleanup() {
139 // no op
140 }
141
142 /***
143 * Push a set of records into the pipeline.
144 *
145 * @param validChannel inbound side of the pipe.
146 * @return int number of records processed.
147 * @throws ProcessingException
148 * @since 1.0
149 */
150 public int push(Channel validChannel) throws ProcessingException {
151
152 // load records
153 Collection valid = loadBatch();
154
155 Collection errors = getErrors();
156
157 Collection all = new ArrayList();
158 all.addAll(valid);
159 all.addAll(errors);
160
161 int size = all.size();
162 if (size <= 0) {
163 log.debug("no records to process.");
164 return size;
165 }
166
167 try {
168 // create an audit record w/ the correct counts.
169 auditAgent.createAudits(all);
170 }
171 catch (AuditException ae) {
172 log.error("AuditException thrown by createAudits()", ae);
173 throw new ProcessingException(
174 "AuditException in createAudits",
175 ae);
176 }
177
178 // push the records into the channel.
179 errorChannel.push(errors);
180 validChannel.push(valid);
181 errors.clear();
182 return size;
183 }
184
185 /***
186 * Retrieve the batch.
187 *
188 * @return Collection - the records bound for the channel.
189 * @throws ProcessingException
190 * @since 1.0
191 */
192 protected abstract Collection loadBatch() throws ProcessingException;
193
194 /***
195 * set the error channel.
196 * <p/>
197 * This method can be used by input adapters in cases where they
198 * have parsing errors or other non-fatal record level errors
199 * when loading a batch of records.
200 * @param errorChannel
201 * @since 1.0
202 */
203 public void setErrorChannel(Channel errorChannel) {
204 this.errorChannel = errorChannel;
205 }
206
207 /***
208 * get errors
209 * @return Collection
210 * @since 1.1
211 */
212 protected Collection getErrors() {
213 return this.errors;
214 }
215
216 /***
217 * The concrete input adapter can use this method to add
218 * error records to the error channel while maintaining
219 * the correct audit entries for valid (returned from
220 * loadBatch() & pushed into the normal pipeline channel)
221 * & errors (added by calling addError() & pushed into
222 * the error channel.
223 * @param record
224 * @since 1.0
225 */
226 public final void addToErrorChannel(Record record) {
227 getErrors().add(record);
228 }
229
230 }
This page was automatically generated by Maven