View Javadoc
1 /* ==================================================================== 2 * License: 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in 13 * the documentation and/or other materials provided with the 14 * distribution. 15 * 16 * 3. The end-user documentation included with the redistribution, 17 * if any, must include the following acknowledgment: 18 * "This product includes software developed by 19 * Robert Half International (http://www.rhi.com/)." 20 * Alternately, this acknowledgment may appear in the software itself, 21 * if and wherever such third-party acknowledgments normally appear. 22 * 23 * 4. The names "Parc", "RHI", and "Robert Half International" must 24 * not be used to endorse or promote products derived from this 25 * software without prior written permission. For written 26 * permission, please contact pete.mckinstry@rhi.com. 27 * 28 * 5. Products derived from this software may not be called "PARC", 29 * nor may "PARC" appear in their name, without prior written 30 * permission of Robert Half International. 31 * 32 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED 33 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 34 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 35 * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR 36 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF 39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 40 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 41 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 42 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 43 * SUCH DAMAGE. 44 * ==================================================================== 45 * 46 */ 47 package com.rhi.architecture.batch; 48 49 import com.rhi.architecture.logging.LogUtil; 50 import com.rhi.architecture.logging.Logger; 51 import com.rhi.architecture.parc.Record; 52 import com.rhi.architecture.resource.InitializationException; 53 import com.rhi.architecture.config.ConfigurationException; 54 import com.rhi.architecture.db.DBUtil; 55 import com.rhi.architecture.xa.DatabaseTransaction; 56 import com.rhi.architecture.xa.Transaction; 57 58 import java.sql.Connection; 59 import java.sql.SQLException; 60 import java.util.Collection; 61 import java.util.HashSet; 62 import java.util.Iterator; 63 import java.util.Properties; 64 65 import javax.sql.DataSource; 66 67 /*** 68 * Abstract Audit Agent providing common audit count functionality 69 * 70 * @author Pete McKinstry 71 * @copyright 2002, Robert Half Int'l., Inc. All rights reserved. 72 * 73 * @since 1.0 74 */ 75 public abstract class AbstractAuditAgent extends AuditAgent { 76 77 /*** 78 * Logger 79 */ 80 private static Logger log = null; 81 82 /*** 83 * key for audit agent insert sql statement. (in config) 84 */ 85 public static final String INSERT_SQL_KEY = 86 "AbstractAuditAgent.InsertRecord"; 87 /*** 88 * key for audit agent update sql statement. (in config) 89 */ 90 public static final String UPDATE_SQL_KEY = 91 "AbstractAuditAgent.UpdateRecord"; 92 /*** 93 * key for audit agent select sql statement. (in config) 94 */ 95 public static final String SELECT_SQL_KEY = 96 "AbstractAuditAgent.SelectRecord"; 97 98 /*** 99 * variable holding actual sql for insert. 100 */ 101 private String insertRecordSQL; 102 /*** 103 * variable holding actual sql for update. 104 */ 105 private String updateRecordSQL; 106 /*** 107 * variable holding actual sql for select. 108 */ 109 private String selectRecordSQL; 110 111 /*** 112 * Set (ignores duplicates) of all valid record keys. Used to audit 113 * number of records processed. 114 */ 115 private HashSet validSet; 116 /*** 117 * Set (ignores duplicates) of all errored record keys. Used to audit 118 * number of records processed. 119 */ 120 private HashSet errSet; 121 122 /*** 123 * lock for concurrency checks. 124 */ 125 private Object objLock = new Object(); 126 127 /*** 128 * Constructor. 129 * 130 * @since 1.0 131 */ 132 public AbstractAuditAgent() { 133 validSet = new HashSet(); 134 errSet = new HashSet(); 135 } 136 137 /*** 138 * Initialization Logic 139 * 140 * @param p configuration properties for the audit agent. 141 * @throws InitializationException thrown when a problem is 142 * encountered configuring the audit agent. 143 * @since 1.0 144 */ 145 public void init(Properties p) throws InitializationException { 146 try { 147 log = LogUtil.getLogger(); 148 } 149 catch (ConfigurationException e) { 150 throw new InitializationException("Unable to create Logger", e); 151 } 152 153 log.debug("init()"); 154 155 loadSQL(p); 156 157 configureRecord(p); 158 } 159 160 /*** 161 * Allow concrete subclasses a hook to set values on their AuditRecord 162 * class. 163 * 164 * @param p configuration properties for the audit agent. 165 * @throws InitializationException error initializing the audit agent 166 * @since 1.0 167 */ 168 public abstract void configureRecord(Properties p) 169 throws InitializationException; 170 171 /*** 172 * Helper method called by init() to load the required SQL entries from 173 * the properties file. 174 * 175 * @param p configuration properties for the audit agent. 176 * @throws InitializationException error initializing the audit agent 177 * @since 1.0 178 */ 179 protected void loadSQL(Properties p) throws InitializationException { 180 181 // load the SQL for updating the audit counts. 182 this.insertRecordSQL = p.getProperty(getInsertSQLKey(), "not found"); 183 if (insertRecordSQL.equals("not found")) { 184 log.error(getInsertSQLKey() + " missing from config properties."); 185 throw new InitializationException( 186 getInsertSQLKey() + " missing from configuration properties."); 187 } 188 this.updateRecordSQL = p.getProperty(getUpdateSQLKey(), "not found"); 189 if (updateRecordSQL.equals("not found")) { 190 log.error(getUpdateSQLKey() + " missing from config properties."); 191 throw new InitializationException( 192 getUpdateSQLKey() + " missing from configuration properties."); 193 } 194 this.selectRecordSQL = p.getProperty(getSelectSQLKey(), "not found"); 195 if (selectRecordSQL.equals("not found")) { 196 log.error(getSelectSQLKey() + " missing from config properties."); 197 throw new InitializationException( 198 getSelectSQLKey() + " missing from configuration properties."); 199 } 200 } 201 202 /*** 203 * Create an appropriate audit /control record. This 204 * method should create a record w/ only a unprocessed 205 * count > 0. The auditCollection method will be called 206 * as records are passed through the pipeline & this 207 * method will modify the counts; subtracting from the 208 * unprocesed, and adding to the errored, valid, or 209 * duplicate counts. 210 * 211 * @param c a collection of records that need to be audited. 212 * @throws AuditException error auditing collection, usually fatal. 213 * @since 1.0 214 */ 215 public void createAudits(Collection c) throws AuditException { 216 int size = c.size(); 217 log.debug("createAudits(), unprocessed count = " + size); 218 219 Connection conn = null; 220 try { 221 DataSource ds = DBUtil.getDataSource(); 222 conn = ds.getConnection(); 223 if (loadRecord(conn) == false) { 224 // set unprocessed count. 225 getRecord().setUnprocessedCount(new Integer(size)); 226 insertRecord(conn); 227 } 228 else { 229 // set unprocessed count. 230 getRecord().setUnprocessedCount(new Integer(size)); 231 updateRecord(conn); 232 } 233 } // try 234 catch (ConfigurationException e) { 235 throw new AuditException("can't create audit record", e); 236 } 237 catch (SQLException e) { 238 throw new AuditException("can't create audit record", e); 239 } 240 finally { 241 try { 242 if (conn != null && !conn.isClosed()) { 243 conn.close(); 244 } 245 } 246 catch (SQLException e) { 247 // ignore 248 } 249 } // finally 250 } 251 252 /*** 253 * Audit the given collection. This method should 254 * subtract the given collection.size() from the un- 255 * processed count for this audit set & add the 256 * collection.size() in some manner to the processed 257 * columns (valid,errored,duplicate,other) 258 * 259 * @param c - records to be audited. 260 * @param t - the transaction to participate in. 261 * @throws AuditException thrown when an error occurs auditing the given 262 * collection. 263 * @since 1.0 264 */ 265 public void auditCollection(Collection c, Transaction t) 266 throws AuditException { 267 log.debug("auditing " + c.size() + " new records."); 268 269 synchronized (objLock) { 270 updateRecordCounts(c); 271 272 // The connection is controlled by the transaction & should 273 // not be closed w/in the audit agent. 274 Connection conn = ((DatabaseTransaction) t).getConnection(); 275 updateRecord(conn); 276 } 277 } 278 279 /*** 280 * Update record counts based on the new collection. 281 * 282 * @param c update counts for this collection 283 * @throws AuditException thrown if error updating counts. 284 * @since 1.0 285 */ 286 protected void updateRecordCounts(Collection c) throws AuditException { 287 288 // The newlyProcessed flag represents the unique new source 289 // records that are counted (subtracted from the unprocessed 290 // count) as part of this audit cycle. It they have previously 291 // been counted, they are not re-counted. 292 int newlyProcessed = 0; 293 294 // extract info from collection. 295 Iterator iter = c.iterator(); 296 while (iter.hasNext()) { 297 // this is the pipeline record, not the audit record 298 Record record = (Record) iter.next(); 299 Object key = record.getSourceKey(); 300 if (record.isValid()) { 301 if (errSet.contains(key) == false) { 302 if (validSet.add(key) == true) { 303 ++newlyProcessed; 304 } 305 } 306 } 307 else { 308 if (validSet.contains(key) == true) { 309 if (validSet.remove(key) == true) { 310 // the decrement is only here because we'll 311 // be incrementing it 2 statements later. 312 // We don't want to double count. 313 --newlyProcessed; 314 } 315 } 316 if (errSet.add(key) == true) { 317 ++newlyProcessed; 318 } 319 } 320 } 321 // calculate new values 322 int processed = validSet.size(); 323 int errored = errSet.size(); 324 int unprocessed = 325 getRecord().getUnprocessedCount().intValue() - (newlyProcessed); 326 log.debug( 327 "processed = " 328 + processed 329 + ", errored = " 330 + errored 331 + ", unprocessed = " 332 + unprocessed); 333 // Check that you don't have a negative unprocessed value. 334 // This would only happen if you somehow counted the valid & 335 // errored records incorrectly. 336 if (unprocessed < 0) { 337 log.error("control metric totals invalid. u<0"); 338 throw new AuditException("control metric totals invalid."); 339 } 340 341 // update control record. 342 getRecord().setProcessedCount(new Integer(processed)); 343 getRecord().setErroredCount(new Integer(errored)); 344 getRecord().setUnprocessedCount(new Integer(unprocessed)); 345 } 346 347 /*** 348 * Load record (check for existence. 349 * 350 * @param conn db connection used for audits. (maintaining a single 351 * db transaction) 352 * @return true if records exists, else false. 353 * @throws AuditException error retrieving audit record. 354 * @since 1.0 355 */ 356 protected abstract boolean loadRecord(Connection conn) 357 throws AuditException; 358 359 /*** 360 * Insert new audit record 361 * 362 * @param conn db connection used for audits in order to maintain all data 363 * updates w/in a single transaction. 364 * @throws AuditException error inserting a new audit record. 365 * @since 1.0 366 */ 367 protected abstract void insertRecord(Connection conn) 368 throws AuditException; 369 370 /*** 371 * Update record 372 * 373 * @param conn db connection used for audits in order to maintain all data 374 * updates w/in a single transaction. 375 * @throws AuditException error updating a audit record. 376 * @since 1.0 377 */ 378 protected abstract void updateRecord(Connection conn) 379 throws AuditException; 380 381 /*** 382 * No op. This method is not very useful now that a Transaction is used 383 * for all OutputAdapter commit logic. Subclasses will probably not need 384 * to override this method. 385 * 386 * @throws AuditException error commiting the audits. 387 * @since 1.0 388 */ 389 public void commitAudits() throws AuditException { 390 log.debug("commitAudits() - no op"); 391 } 392 393 /*** 394 * Get concrete AuditRecord 395 * Concrete AuditAgent will return the actual audit record being 396 * used. Allow more flexibility in sub-classes. 397 * 398 * @return AuditRecord the audit record sub-class to be used. 399 * @since 1.0 400 */ 401 public abstract AuditRecord getRecord(); 402 403 /*** 404 * get insert SQL 405 * 406 * @return String insert sql 407 * @since 1.0 408 */ 409 protected String getInsertSQL() { 410 return this.insertRecordSQL; 411 } 412 413 /*** 414 * get update SQL 415 * 416 * @return String update sql 417 * @since 1.0 418 */ 419 protected String getUpdateSQL() { 420 return this.updateRecordSQL; 421 } 422 423 /*** 424 * get select SQL 425 * 426 * @return String select sql 427 * @since 1.0 428 */ 429 protected String getSelectSQL() { 430 return this.selectRecordSQL; 431 } 432 433 /*** 434 * Allow subclasses to override config key. 435 * 436 * @return String insert sql 437 * @since 1.0 438 */ 439 protected String getInsertSQLKey() { 440 return AbstractAuditAgent.INSERT_SQL_KEY; 441 } 442 443 /*** 444 * Allow subclasses to override config key. 445 * 446 * @return String update sql key 447 * @since 1.0 448 */ 449 protected String getUpdateSQLKey() { 450 return AbstractAuditAgent.UPDATE_SQL_KEY; 451 } 452 453 /*** 454 * Allow subclasses to override config key. 455 * 456 * @return String select sql key 457 * @since 1.0 458 */ 459 protected String getSelectSQLKey() { 460 return AbstractAuditAgent.SELECT_SQL_KEY; 461 } 462 463 /*** 464 * Get the Logger reference 465 * 466 * @return Logger log instance. 467 * @since 1.0 468 */ 469 public Logger log() { 470 return AbstractAuditAgent.log; 471 } 472 473 }

This page was automatically generated by Maven