View Javadoc
1 /* ====================================================================
2 * License:
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in
13 * the documentation and/or other materials provided with the
14 * distribution.
15 *
16 * 3. The end-user documentation included with the redistribution,
17 * if any, must include the following acknowledgment:
18 * "This product includes software developed by
19 * Robert Half International (http://www.rhi.com/)."
20 * Alternately, this acknowledgment may appear in the software itself,
21 * if and wherever such third-party acknowledgments normally appear.
22 *
23 * 4. The names "Parc", "RHI", and "Robert Half International" must
24 * not be used to endorse or promote products derived from this
25 * software without prior written permission. For written
26 * permission, please contact pete.mckinstry@rhi.com.
27 *
28 * 5. Products derived from this software may not be called "PARC",
29 * nor may "PARC" appear in their name, without prior written
30 * permission of Robert Half International.
31 *
32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
43 * SUCH DAMAGE.
44 * ====================================================================
45 *
46 */
47 package com.rhi.architecture.batch;
48
49 import com.rhi.architecture.logging.LogUtil;
50 import com.rhi.architecture.logging.Logger;
51 import com.rhi.architecture.parc.Record;
52 import com.rhi.architecture.resource.InitializationException;
53 import com.rhi.architecture.config.ConfigurationException;
54 import com.rhi.architecture.db.DBUtil;
55 import com.rhi.architecture.xa.DatabaseTransaction;
56 import com.rhi.architecture.xa.Transaction;
57
58 import java.sql.Connection;
59 import java.sql.SQLException;
60 import java.util.Collection;
61 import java.util.HashSet;
62 import java.util.Iterator;
63 import java.util.Properties;
64
65 import javax.sql.DataSource;
66
67 /***
68 * Abstract Audit Agent providing common audit count functionality
69 *
70 * @author Pete McKinstry
71 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved.
72 *
73 * @since 1.0
74 */
75 public abstract class AbstractAuditAgent extends AuditAgent {
76
77 /***
78 * Logger
79 */
80 private static Logger log = null;
81
82 /***
83 * key for audit agent insert sql statement. (in config)
84 */
85 public static final String INSERT_SQL_KEY =
86 "AbstractAuditAgent.InsertRecord";
87 /***
88 * key for audit agent update sql statement. (in config)
89 */
90 public static final String UPDATE_SQL_KEY =
91 "AbstractAuditAgent.UpdateRecord";
92 /***
93 * key for audit agent select sql statement. (in config)
94 */
95 public static final String SELECT_SQL_KEY =
96 "AbstractAuditAgent.SelectRecord";
97
98 /***
99 * variable holding actual sql for insert.
100 */
101 private String insertRecordSQL;
102 /***
103 * variable holding actual sql for update.
104 */
105 private String updateRecordSQL;
106 /***
107 * variable holding actual sql for select.
108 */
109 private String selectRecordSQL;
110
111 /***
112 * Set (ignores duplicates) of all valid record keys. Used to audit
113 * number of records processed.
114 */
115 private HashSet validSet;
116 /***
117 * Set (ignores duplicates) of all errored record keys. Used to audit
118 * number of records processed.
119 */
120 private HashSet errSet;
121
122 /***
123 * lock for concurrency checks.
124 */
125 private Object objLock = new Object();
126
127 /***
128 * Constructor.
129 *
130 * @since 1.0
131 */
132 public AbstractAuditAgent() {
133 validSet = new HashSet();
134 errSet = new HashSet();
135 }
136
137 /***
138 * Initialization Logic
139 *
140 * @param p configuration properties for the audit agent.
141 * @throws InitializationException thrown when a problem is
142 * encountered configuring the audit agent.
143 * @since 1.0
144 */
145 public void init(Properties p) throws InitializationException {
146 try {
147 log = LogUtil.getLogger();
148 }
149 catch (ConfigurationException e) {
150 throw new InitializationException("Unable to create Logger", e);
151 }
152
153 log.debug("init()");
154
155 loadSQL(p);
156
157 configureRecord(p);
158 }
159
160 /***
161 * Allow concrete subclasses a hook to set values on their AuditRecord
162 * class.
163 *
164 * @param p configuration properties for the audit agent.
165 * @throws InitializationException error initializing the audit agent
166 * @since 1.0
167 */
168 public abstract void configureRecord(Properties p)
169 throws InitializationException;
170
171 /***
172 * Helper method called by init() to load the required SQL entries from
173 * the properties file.
174 *
175 * @param p configuration properties for the audit agent.
176 * @throws InitializationException error initializing the audit agent
177 * @since 1.0
178 */
179 protected void loadSQL(Properties p) throws InitializationException {
180
181 // load the SQL for updating the audit counts.
182 this.insertRecordSQL = p.getProperty(getInsertSQLKey(), "not found");
183 if (insertRecordSQL.equals("not found")) {
184 log.error(getInsertSQLKey() + " missing from config properties.");
185 throw new InitializationException(
186 getInsertSQLKey() + " missing from configuration properties.");
187 }
188 this.updateRecordSQL = p.getProperty(getUpdateSQLKey(), "not found");
189 if (updateRecordSQL.equals("not found")) {
190 log.error(getUpdateSQLKey() + " missing from config properties.");
191 throw new InitializationException(
192 getUpdateSQLKey() + " missing from configuration properties.");
193 }
194 this.selectRecordSQL = p.getProperty(getSelectSQLKey(), "not found");
195 if (selectRecordSQL.equals("not found")) {
196 log.error(getSelectSQLKey() + " missing from config properties.");
197 throw new InitializationException(
198 getSelectSQLKey() + " missing from configuration properties.");
199 }
200 }
201
202 /***
203 * Create an appropriate audit /control record. This
204 * method should create a record w/ only a unprocessed
205 * count > 0. The auditCollection method will be called
206 * as records are passed through the pipeline & this
207 * method will modify the counts; subtracting from the
208 * unprocesed, and adding to the errored, valid, or
209 * duplicate counts.
210 *
211 * @param c a collection of records that need to be audited.
212 * @throws AuditException error auditing collection, usually fatal.
213 * @since 1.0
214 */
215 public void createAudits(Collection c) throws AuditException {
216 int size = c.size();
217 log.debug("createAudits(), unprocessed count = " + size);
218
219 Connection conn = null;
220 try {
221 DataSource ds = DBUtil.getDataSource();
222 conn = ds.getConnection();
223 if (loadRecord(conn) == false) {
224 // set unprocessed count.
225 getRecord().setUnprocessedCount(new Integer(size));
226 insertRecord(conn);
227 }
228 else {
229 // set unprocessed count.
230 getRecord().setUnprocessedCount(new Integer(size));
231 updateRecord(conn);
232 }
233 } // try
234 catch (ConfigurationException e) {
235 throw new AuditException("can't create audit record", e);
236 }
237 catch (SQLException e) {
238 throw new AuditException("can't create audit record", e);
239 }
240 finally {
241 try {
242 if (conn != null && !conn.isClosed()) {
243 conn.close();
244 }
245 }
246 catch (SQLException e) {
247 // ignore
248 }
249 } // finally
250 }
251
252 /***
253 * Audit the given collection. This method should
254 * subtract the given collection.size() from the un-
255 * processed count for this audit set & add the
256 * collection.size() in some manner to the processed
257 * columns (valid,errored,duplicate,other)
258 *
259 * @param c - records to be audited.
260 * @param t - the transaction to participate in.
261 * @throws AuditException thrown when an error occurs auditing the given
262 * collection.
263 * @since 1.0
264 */
265 public void auditCollection(Collection c, Transaction t)
266 throws AuditException {
267 log.debug("auditing " + c.size() + " new records.");
268
269 synchronized (objLock) {
270 updateRecordCounts(c);
271
272 // The connection is controlled by the transaction & should
273 // not be closed w/in the audit agent.
274 Connection conn = ((DatabaseTransaction) t).getConnection();
275 updateRecord(conn);
276 }
277 }
278
279 /***
280 * Update record counts based on the new collection.
281 *
282 * @param c update counts for this collection
283 * @throws AuditException thrown if error updating counts.
284 * @since 1.0
285 */
286 protected void updateRecordCounts(Collection c) throws AuditException {
287
288 // The newlyProcessed flag represents the unique new source
289 // records that are counted (subtracted from the unprocessed
290 // count) as part of this audit cycle. It they have previously
291 // been counted, they are not re-counted.
292 int newlyProcessed = 0;
293
294 // extract info from collection.
295 Iterator iter = c.iterator();
296 while (iter.hasNext()) {
297 // this is the pipeline record, not the audit record
298 Record record = (Record) iter.next();
299 Object key = record.getSourceKey();
300 if (record.isValid()) {
301 if (errSet.contains(key) == false) {
302 if (validSet.add(key) == true) {
303 ++newlyProcessed;
304 }
305 }
306 }
307 else {
308 if (validSet.contains(key) == true) {
309 if (validSet.remove(key) == true) {
310 // the decrement is only here because we'll
311 // be incrementing it 2 statements later.
312 // We don't want to double count.
313 --newlyProcessed;
314 }
315 }
316 if (errSet.add(key) == true) {
317 ++newlyProcessed;
318 }
319 }
320 }
321 // calculate new values
322 int processed = validSet.size();
323 int errored = errSet.size();
324 int unprocessed =
325 getRecord().getUnprocessedCount().intValue() - (newlyProcessed);
326 log.debug(
327 "processed = "
328 + processed
329 + ", errored = "
330 + errored
331 + ", unprocessed = "
332 + unprocessed);
333 // Check that you don't have a negative unprocessed value.
334 // This would only happen if you somehow counted the valid &
335 // errored records incorrectly.
336 if (unprocessed < 0) {
337 log.error("control metric totals invalid. u<0");
338 throw new AuditException("control metric totals invalid.");
339 }
340
341 // update control record.
342 getRecord().setProcessedCount(new Integer(processed));
343 getRecord().setErroredCount(new Integer(errored));
344 getRecord().setUnprocessedCount(new Integer(unprocessed));
345 }
346
347 /***
348 * Load record (check for existence.
349 *
350 * @param conn db connection used for audits. (maintaining a single
351 * db transaction)
352 * @return true if records exists, else false.
353 * @throws AuditException error retrieving audit record.
354 * @since 1.0
355 */
356 protected abstract boolean loadRecord(Connection conn)
357 throws AuditException;
358
359 /***
360 * Insert new audit record
361 *
362 * @param conn db connection used for audits in order to maintain all data
363 * updates w/in a single transaction.
364 * @throws AuditException error inserting a new audit record.
365 * @since 1.0
366 */
367 protected abstract void insertRecord(Connection conn)
368 throws AuditException;
369
370 /***
371 * Update record
372 *
373 * @param conn db connection used for audits in order to maintain all data
374 * updates w/in a single transaction.
375 * @throws AuditException error updating a audit record.
376 * @since 1.0
377 */
378 protected abstract void updateRecord(Connection conn)
379 throws AuditException;
380
381 /***
382 * No op. This method is not very useful now that a Transaction is used
383 * for all OutputAdapter commit logic. Subclasses will probably not need
384 * to override this method.
385 *
386 * @throws AuditException error commiting the audits.
387 * @since 1.0
388 */
389 public void commitAudits() throws AuditException {
390 log.debug("commitAudits() - no op");
391 }
392
393 /***
394 * Get concrete AuditRecord
395 * Concrete AuditAgent will return the actual audit record being
396 * used. Allow more flexibility in sub-classes.
397 *
398 * @return AuditRecord the audit record sub-class to be used.
399 * @since 1.0
400 */
401 public abstract AuditRecord getRecord();
402
403 /***
404 * get insert SQL
405 *
406 * @return String insert sql
407 * @since 1.0
408 */
409 protected String getInsertSQL() {
410 return this.insertRecordSQL;
411 }
412
413 /***
414 * get update SQL
415 *
416 * @return String update sql
417 * @since 1.0
418 */
419 protected String getUpdateSQL() {
420 return this.updateRecordSQL;
421 }
422
423 /***
424 * get select SQL
425 *
426 * @return String select sql
427 * @since 1.0
428 */
429 protected String getSelectSQL() {
430 return this.selectRecordSQL;
431 }
432
433 /***
434 * Allow subclasses to override config key.
435 *
436 * @return String insert sql
437 * @since 1.0
438 */
439 protected String getInsertSQLKey() {
440 return AbstractAuditAgent.INSERT_SQL_KEY;
441 }
442
443 /***
444 * Allow subclasses to override config key.
445 *
446 * @return String update sql key
447 * @since 1.0
448 */
449 protected String getUpdateSQLKey() {
450 return AbstractAuditAgent.UPDATE_SQL_KEY;
451 }
452
453 /***
454 * Allow subclasses to override config key.
455 *
456 * @return String select sql key
457 * @since 1.0
458 */
459 protected String getSelectSQLKey() {
460 return AbstractAuditAgent.SELECT_SQL_KEY;
461 }
462
463 /***
464 * Get the Logger reference
465 *
466 * @return Logger log instance.
467 * @since 1.0
468 */
469 public Logger log() {
470 return AbstractAuditAgent.log;
471 }
472
473 }
This page was automatically generated by Maven