Neste artigo vou demonstrar como otimizar a extração e carga de uma grande massa de dados (bulk load) através do Talend. Para a maioria das tecnologias de bancos de dados suportadas pelo Talend existem os componentes BulkOutput, BulkExec e OutputBulkExec, estes componentes permitem a extração para arquivo (OutputBulk), carga do arquivo utilizando o método de load do próprio banco (BulkExec) e extração e carga utilizando o mesmo componente (OutputBulkExec).
O job de demonstração abaixo foi desenvolvido para a extração de dados de um banco de dados Oracle e carga em um Sybase IQ, mas alguns tweaks foram necessários para otimizar da performance e realizar bulk load com o Talend, pois apenas esta tabela tinha um volume na casa dos terabytes.
Implementando bulk load no Talend
Para começar, primeiramente estabelecemos as conexões com os databases. Como este job executa mais de um subjob em paralelo (veremos adiante como ativar multithreads) temos de garantir que as conexões sejam estabelecidas antes da execução de qualquer outro subjob, para isso utilizamos o componente tPrejob, e por segurança caso a última execução tenha terminado sem sucesso deletamos os arquivos extraídos utilizando os componentes tFileList e tFileDelete. Além do tPrejob, existe o tPostjob, que garante que os subjobs ligados a ele através de triggers sejam executados somente após a execução dos demais subjobs não ligados a ele.
Um subjob é um segmento do job que pode ser executado independentemente da execução de outros segmentos, ou pode estar conectado a outros através de triggers. Graficamente eles são delimitados por retângulos de fundo azul claro, laranja ou outra cor caso você tenha customizado.
Estabelecidas as conexões e removidos os arquivos de execuções anteriores, damos início à extração e carga de nossos dados.
O subjob logo abaixo do Prejob executa uma query no nosso banco destino, neste caso um Sybase IQ, obtendo o maior ID da tabela que pretendemos carregar, desta forma podemos extrair de nossa fonte apenas os registros novos, cujo ID seja maior que o carregado na última execução. O resultado desta query é propagado ao componente tJavaRow, que armazena o valor em uma variável de contexto.
Trabalhar com o tJavaRow é simples: clique em Sync Columns e então em Generate code, e o Talend irá gerar um código geralmente no padrão
output_row.MAX = input_row.MAX;
onde MAX é o nome de um dos campos do schema do componente (neste caso há apenas um campo), output_row é a saída do tJavaRow e input_row é a entrada, o que ele está recebendo do componente predecessor. Mas como não pretendemos propagar dados a partir deste componente, substituímos output_row por context.NOME_DA_VARIAVEL ficando desta forma:
context.NOME_DA_VARIAVEL = input_row.MAX;
Para criar variáveis de contexto vá até a view Contexts, adicione suas variáveis, defina seus tipos e valores default, você pode ver mais detalhes neste artigo: Usando variáveis de contexto.
Extraído este valor máximo e armazenado em uma variável, partimos para o subjob de extração dos dados da fonte e para garantir que esta etapa seja executada somente após a execução da etapa anterior, ligamos os dois subjobs com a trigger OnSubjobOk.
Este banco de dados que utilizamos como origem é um Oracle RAC (Real Application Cluster), um cluster de databases Oracle para garantir a alta disponibilidade. Como o Talend ainda não suporta a conexão a RAC especificando mais de um host (especificando um apenas funciona, mas não garante a alta disponibilidade) utilizamos uma conexão do tipo General JDBC, onde podemos editar a URL de conexão livremente (O suporte a conexão com RACs Oracle utilizando os componentes desta tecnologia está previsto para a versão 5.0).
Então editamos a query no database de origem, para trazer apenas os dados onde o ID seja maior do que o presente no database destino. Para isso selecione o componente de input e vá em Query, lá edite a query para que ela fique semelhante a esta:
"SELECT ... ... FROM ORIGEM WHERE ID > " + context.VARIAVEL
Observe como adicionamos a variável de contexto à query: como no fim das contas isso é uma String para a linguagem Java, colocamos a parte constante entre aspas e então concatenamos o conteúdo da variável, que será definido em tempo de execução, utilizando o operador +.
Dica de desempenho: para obter o máximo do bulk load no Talend, em alguns componentes de Input de algumas tecnologias existem opções que permitem otimizar a forma como os dados são extraídos. Nos componentes Oracle e JDBC (dentre outros) existe a opção Enable Cursor que em alguns casos permite obter um ganho de performance considerável. Já no componente de input do MySQL existe a opção Enable stream que também pode incrementar sua performance na extração de dados. Mas atenção: com a opção de stream no Mysql não é possível utilizar uma única conexão em modo multithread com a opção Use an existing connection.
E como modo de armazenamento intermediário destes dados obtidos do database de origem utilizamos arquivo(s) de texto puro. Neste caso utilizaríamos normalmente um componente OutputBulk, mas ao invés disso utilizamos um componente tFileOutputDelimited, que no fim das contas tem o mesmo propósito: gerar um arquivo de texto puro, mas além disso possui uma opção interessante: na guia Advanced do componente ativamos a opção Split output in several files que fará surgir uma outra opção chamada Rows in each output file, com estas opções especificamos que o conjunto de registros obtidos da fonte deverá ser dividido em vários arquivos com um número máximo de registros, cujo valor é definido por você. Neste caso defini um limite de 100 milhões de registros por arquivo. Assim teremos vários arquivos com o nome PRODUCTS1.dat, PRODUCTS2.dat…
O propósito de se dividir o resultset em vários arquivos você entenderá agora com a descrição do outro subjob, que é executado em paralelo ao que foi descrito até aqui.
Este subjob executa enquanto o outro subjob não tiver concluído sua execução. Para isso utilizamos o componente tLoop, onde podemos definir o tipo de Loop que pretendemos executar (for ou while) e as condições de sua execução. O loop “for” é recomendado quando você consegue estimar o número de execuções e o loop “while”, como o próprio nome já diz, é recomendado quando você deseja executar ENQUANTO uma condição for verdadeira. Utilizaremos o loop while e mais à frente irei mostrar como construíremos a condição de execução.
Após o tLoop temos o componente tSleep que permite pausarmos a execução deste subjob por um tempo pré determinado, neste caso um minuto. Observe que a ligação entre o tLoop e tSleep é feita através da linha Iterate, pois não há fluxo de dados sendo transmitido, apenas uma repetição de execuções.
Quando o componente tSleep termina a sua execução é chamado o componente tFileList através da trigger OnComponentOk, ou seja, quando a execução apenas deste componente (tSleep) tiver sido concluída, e então o tFileList irá listar o conteúdo de um diretório especificado por você, sendo que é possível adicionar uma máscara para os arquivos listados para que você filtre o resultado.
Estamos listando o conteúdo do diretório onde o componente tFileOutputDelimited está criando arquivos, então definimos uma máscara com o formato de nome que esperamos para estes arquivos, neste caso “PRODUCTS*.dat”, onde * é um caractere curinga onde esperamos a sequència de números que o tFileOutputDelimited irá inserir na criação dos arquivos.
O resultado dos arquivos listados pode ser obtido através de um loop mais uma vez utilizando a linha Iterate, desta forma conectamos o componente tFileList e tSystem.
Através do componente tSystem executamos comandos do sistema, independente do sistema onde o job está sendo executado, Windows, Linux, FreeBSD… quem sabe outros.. E neste especificamente executamos um comando pertencente a sistemas Unix e similares: o lsof, que lista os arquivos abertos no momento por algum processo.
Observe o comando sendo executado:
"sudo /usr/sbin/lsof " + ((String)globalMap.get("tFileList_1_CURRENT_FILEPATH"))
Chamamos o comando lsof e concatenamos à String o caminho do arquivo atual listado pelo componente tFileList, mas como obter este caminho?
Observe que no canto esquerdo inferior da suite, abaixo do repositório, existe uma view chamada Outline. Dentro desta view existem variáveis de retorno dos componentes que você está utilizando no job atualmente aberto, como número de linhas inseridas em um arquivo ou erro retornado por um componente. Para utilizar estas variáveis você simplesmente seleciona e arrasta para o local desejado, um campo de código livre como no tJava ou tJavaRow ou mesmo um campo de opção de algum componente, como iremos fazer agora.
Para obter os nomes dos arquivos que listamos no componente tFileList, incluindo o caminho de sua localização, e concatenar esta String ao comando lsof, vamos até a view Outline, expandimos a visão do componente que desejamos, neste caso o tFileList_1 e arrastamos a variável Current File Name with path para o campo onde executamos o comando do sistema no componente tSystem, concatenando as Strings com o operador +.
Atenção: Tome o cuidado de observar o nome único do componente na view Outline, neste job temos por exemplo os componentes tFileList_1 e tFileList_2. Você pode obter o nome único do componente clicando no mesmo e olhando o cabeçalho da view Component.
Agora que já listamos os arquivos criados naquele diretório e executamos o comando lsof sobre a lista obtida, partimos para a carga em si destes arquivos no database destino e então você entenderá o por que de tudo isso.
Como estamos gerando vários arquivos separados em 100 milhões de registros e em paralelo temos um subjob responsável pela carga destes arquivos no Sybase IQ, temos de garantir que a carga dos arquivos extraídos somente seja executada após a conclusão destes, ou seja, após o seu fechamento. Para isso executamos o comando lsof e, dependendo do resultado, saberemos se o arquivo está fechado, significando que o mesmo está pronto para carga, ou aberto, significando que a extração de registros para este arquivo ainda não foi concluída. Este passo é essencial para implementarmos essa tática criativa de bulk load com o Talend.
Então utilizamos o componente BulkExec da tecnologia destino, aqui um Sybase IQ, para carregar o arquivo do loop atual. Neste componente você define o schema da tabela destino e aponta para o arquivo a ser carregado, mais uma vez obtendo o nome do arquivo e caminho listado no tFileList através da view Outline.
E para garantir que sejam carregados apenas os arquivos fechados, conectamos o componente tSystem ao tSybaseIQBulkExec utilizando a trigger If, onde poderemos especificar uma condição para execução do componente sucessor. Como condição utilizamos o retorno do componente tSystem, que caso seja igual a 1 indica que o arquivo atual não foi listado, ou seja, não está aberto por nenhum processo e então podemos carregá-lo.
Por fim, chamamos os componentes tSybaseCommit e tFileDelete logo após a carga do arquivo, que como os próprios nomes descrevem, realizam o Commit da transação e exclusão do arquivo carregado (mais uma vez utilizamos a variável Current file with path do componente tFileList).
Agora os detalhes adicionais para conclusão deste job. Como condição de execução do componente tLoop, queremos que este subjob seja executado enquanto houver algum arquivo a carregar, então irei lhe mostrar como construir esta condição:
Procure pelo nome único do componente tFileOutputDelimited dentro de Outline (neste caso provavelmente haverá apenas tFileOutputDelimited1) e arraste a variável Number of line para o campo Condition do componente tLoop e compare o seu valor com null, ficando desta maneira:
(((Integer)globalMap.get("tFileOutputDelimited_1_NB_LINE")) == null)
Desta forma garantimos que o subjob continue sua execução mesmo se o seu início se der antes do subjob de extração dos dados, pois o número de linhas retornado pelo componente tFileOutputDelimited será igual a null. Mas isto não é suficiente: devemos agora testar se o componente tFileList retornou algum arquivo e para isto arrastamos a variável Number of files da view outline e separamos as condições pelo operador || , que significa OU. Ainda, damos uma margem caso o componente tFileList ainda não tenha sido executado, comparando o conteúdo da variável Number of files também com null e aglomeramos estes dois testes entre parenteses para que seja retornato true (verdadeiro) caso um ou o outro retorne true, mesmo que o outro teste retorne false.
(((Integer)globalMap.get("tFileList_1_NB_FILE")) >= 1 || ((Integer)globalMap.get("tFileList_1_NB_FILE")) == null)
Juntos, os três testes com compoem a condição do loop ficarão assim:
(((Integer)globalMap.get("tFileOutputDelimited_1_NB_LINE")) == null) || (((Integer)globalMap.get("tFileList_1_NB_FILE")) >= 1 || ((Integer)globalMap.get("tFileList_1_NB_FILE")) == null)
Para concluir, utilizamos o componente tPostJob para conectar os componentes tSybaseClose e tJDBCClose, que como os nomes já descrevem, fecham as conexões estabelecidas logo no início da execução do job.
Um último detalhe antes de pressionar F6 é habilitar a execução multithread. Vá até a view Job, entre em Extra e habilite a opção Multithread Execution.
Conclusão
Para a maioria das cargas do tipo bulk load no Talend, quando se tratam de alguns megabytes não há a necessidade de um job como este, provavelmente você utilizará um “input” e um “outputbulkexec“, mas conforme o volume de dados aumenta, a complexidade dos jobs responsáveis por sua carga se eleva proporcionalmente se você quiser incrementar a performance.
Com esta abordagem de bulk load no Talend, o espaço em disco não precisa ser uma preocupação: imagine extrair alguns terabytes de dados em um único arquivo para só então carregar este arquivo, enquanto você pode consumir apenas alguns gigabytes para isso!
Além do espaço em disco, há um ganho de tempo considerável, pois o database destino não precisa ficar horas (ou dias!) esperando a extração ser concluída para só então realizar a carga, enquanto pode fazê-la em partes.
Por fim, a carga de dados a cada 100 milhões de registros torna o processo mais simples de se recuperar de qualquer execução falha: imagine se a 99% da extração dos dados da fonte a rede vai abaixo? Você tem a segurança de que ao menos algo em torno de 95% já tenha sido carregado no destino com sucesso e é esse o motivo de excluírmos quaisquer arquivos existentes no início do processo: pode haver algum registro truncado e então extraímos apenas os registros cujo ID seja maior do que o último carregado no destino.