Sempre que é feito um job,uma das preocupações do lado da aplicação é em como não levar todos os registros necessários para a memória. Existem algumas maneiras para se fazer isso, sendo que as duas mais comuns que vejo são queries limitadas e queries paginadas. Além dessas duas formas existe uma terceira que é limitando a quantidade de registros que o banco retorna através da opção “fetch size” no driver.
Limitação na query
Esse é um dos jeitos mais utilizados que vejo. O uso consiste em rodar a mesma query diversas vezes, retornando uma quantidade limitada de registros e, ao processar o registro, fazer um update no mesmo para que esse registro não retorne novamente na query. O job irá executar a query diversas vezes, até não ser retornado mais registros, o que indica que o job terminou o seu processamento.
Segue um exemplo de como ficaria o job:
int processed = 0;
List<OutboxEntity> toProcess = outboxRepository.retrieveLimited(quantity);
while (!toProcess.isEmpty()) {
for (OutboxEntity item : toProcess) {
// Do some processing
logger.info("Processing item {}", item.getId());
// Mark as processed
item.setProcessed(true);
processed++;
}
// Flush pending changes
entityManager.flush();
// Clear memory
entityManager.clear();
logger.info("Processed {}", processed);
toProcess = outboxRepository.retrieveLimited(quantity);
}
return processed;
E a query:
public List<OutboxEntity> retrieveLimited(int quantity) {
return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.processed = FALSE", OutboxEntity.class)
.setMaxResults(quantity)
.getResultList();
}
Paginação na query
Essa maneira é muito similar com a primeira abordagem, exceto pelo fato de que não é necessário atualizar o registro para que o mesmo não retorne na query. Se o registro for atualizado e não retornar mais, o job não irá processar todos os registros pois os registros ficarão “mudando” de página conforme o processamento vai ocorrendo.
Segue um exemplo de como ficaria o job:
int processed = 0;
int pageNumber = 1;
Date creationDate = new Date();
List<OutboxEntity> toProcess = outboxRepository.retrievePaging(pageNumber++, quantity, creationDate);
while (!toProcess.isEmpty()) {
for (OutboxEntity item : toProcess) {
// Do some processing
logger.info("Processing item {}", item.getId());
// Mark as processed
item.setProcessed(true);
processed++;
}
// Flush pending changes
entityManager.flush();
// Clear memory
entityManager.clear();
logger.info("Processed {}", processed);
toProcess = outboxRepository.retrievePaging(pageNumber++, quantity, creationDate);
}
return processed;
E a query:
public List<OutboxEntity> retrievePaging(int pageNumber, int quantity, Date creationDate) {
return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate ORDER BY o.id", OutboxEntity.class)
.setParameter("creationDate", creationDate)
.setFirstResult((pageNumber - 1) * quantity)
.setMaxResults(quantity)
.getResultList();
}
Fetch size
Essa é a forma que menos vejo ser utilizada. Diferentemente das outras duas abordagens, essa só precisa executar a query no banco de dados apenas uma vez. Para garantir que o banco não irá retornar todos os registros de uma vez para aplicação, fazendo com que a memória da mesma se esgote, é necessário setar a propriedade fetch size no driver que faz a comunicação com o banco. Isso pode ser feito com um opção global, ou pode ser ajustada conforme a query que será executada.
Existem diversas maneiras de fazer isso. Utilizando JDBC puro, basta chamar o método setFetchSize do Statement com a quantidade de registros que devem ser trazidos por vez.
Utilizando Hibernate, deve ser utilizado o método setFetchSize da Query do Hibernate e, além disso, utilizar o método scroll que retorna uma instância da classe ScrollableResults. Essa classe se comporta de forma parecida com a classe ResultSet do JDBC.
No JPA 2.2, é possível obter um Stream através do método getResultStream, mas para ajustar o fetch size especificamente para a query sendo executada, é necessário utilizar o hint QueryHints.HINT_FETCH_SIZE na query.
Segue um exemplo de como ficaria o job com ScrollableResults do Hibernate:
int processed = 0;
Date creationDate = new Date();
ScrollableResults toProcess = outboxRepository.retrieveScroll(fetchSize, creationDate);
while (toProcess.next()) {
OutboxEntity item = (OutboxEntity) toProcess.get(0);
// Do some processing
logger.info("Processing item {}", item.getId());
// Mark as processed
item.setRetries(item.getRetries() + 1);
item.setProcessed(true);
processed++;
if (processed % quantity == 0) {
// Flush pending changes
entityManager.flush();
// Clear memory
entityManager.clear();
logger.info("Processed {}", processed);
}
}
logger.info("Processed {}", processed);
return processed;
E a query:
public ScrollableResults retrieveScroll(int fetchSize, Date creationDate) {
return entityManager.unwrap(Session.class).createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate", OutboxEntity.class)
.setParameter("creationDate", creationDate)
.setFetchSize(fetchSize)
.scroll();
}
Já com JPA 2.2, a implementação do job fica dessa maneira:
int processed = 0;
Date creationDate = new Date();
Iterator<OutboxEntity> iterator = outboxRepository.retrieveStream(fetchSize, creationDate).iterator();
while (iterator.hasNext()) {
OutboxEntity item = iterator.next();
// Do some processing
logger.info("Processing item {}", item.getId());
// Mark as processed
item.setRetries(item.getRetries() + 1);
item.setProcessed(true);
processed++;
if (processed % quantity == 0) {
// Flush pending changes
entityManager.flush();
// Clear memory
entityManager.clear();
logger.info("Processed {}", processed);
}
}
logger.info("Processed {}", processed);
return processed;
E da query:
public Stream<OutboxEntity> retrieveStream(int fetchSize, Date creationDate) {
return entityManager.createQuery("SELECT o FROM OutboxEntity o WHERE o.retries < 3 AND o.creationDate <= :creationDate", OutboxEntity.class)
.setParameter("creationDate", creationDate)
.setHint(QueryHints.HINT_FETCH_SIZE, fetchSize)
.getResultStream();
}
Exemplo real
Em uma das aplicações que trabalhei, havia um job que rodava a todo momento para verificar se alguns registros já haviam sido notificados. Ele rodava a query paginada e processava todo o conjunto de dados toda vez que executava. O plano de execução não era bom e já que ele tinha que processar todos os registros, não havia necessidade de paginar. A solução nesse caso foi trocar o job para obter os dados de uma vez só, utilizando o fetch size para que a memória da aplicação não se esgotasse.
O gráfico do banco de dados durante a alteração da aplicação para que o job rodasse com a query utilizando fetch size:
Como podemos ver, antes da alteração o load do banco de dados estava em aproximadamente 5, sendo que após a alteração o mesmo passou para 3. Além disso, conseguimos ver também uma melhora na CPU livre do banco de dados.
A alteração foi simplesmente a troca da classe org.springframework.batch.item.database.JpaPagingItemReader pela org.springframework.batch.item.database.HibernateCursorItemReader do Spring Batch.
Conclusões
Foram mostradas 3 abordagens, cada uma delas tem vantagens e desvantagens. Na abordagem por limitação, é necessário se preocupar em atualizar o registro processado para que ele não volte na próxima execução da query, entretanto, na paginação é necessário que o registro não seja alterado de forma que ele mude de página ou não seja mais retornado pela query.
Além disso, a principal diferença que vejo é a quantidade de vezes que a query será executada. Na abordagem com fetch size a query só é executada uma única vez. Isso é útil por exemplo em uma query onde o plano de execução é difícil de ser melhorado. No exemplo dado, o plano de execução da query paginada é o seguinte:
Essa query possui 2 ranges scan em colunas diferentes, e o sort por uma outra coluna. Nesse caso, o melhor seria rodar o job com o fetch size pois a query só executaria uma única vez, ao invés de diversas vezes dependendo de quantas páginas forem necessárias para o job executar.
O problema da abordagem com fetch size é que se o job demorar muito para ser executado, no Oracle a aplicação acaba recebendo um erro “ORA-01555: snapshot too old”. Este artigo não tem como intuito resolver esse problema, mas ele pode ser resolvido alterando alguns parâmetros no banco de dados, reduzir o tempo de processamento do job, ou uma terceira abordagem é fazer o commit a cada 1000 itens processados (dessa forma, na próxima execução do job, ele irá retomar o processamento de onde parou, eu não novamente do começo).
Os exemplos mostrados aqui estão em https://github.com/fabionb/artigo_job
Referências
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm
https://docs.jboss.org/hibernate/orm/3.5/api/org/hibernate/ScrollableResults.html