Sistemas Distribuídos

Projeto - TP1 (draft: 1/Mar:23h)

Engenharia Informática

Ano lectivo: 2017/2018, 2º Semestre

Prazos

  • Código: 14 Abril, 23h59 (entrega online)

  • Relatório: 16 Abril, 17h00, (na Secretaria)

Índice

  • Contexto
  • Objectivo
  • MapReduce (Paradigma)
  • Componentes do Sistema
    • Repositório
    • Motor MapReduce
    • Interfaces
  • Requisitos
    • Funcionalidades base
    • Funcionalidades opcionais
  • Ambiente de desenvolvimento e avaliação
  • Relatório & Entrega
  • Materiais de Apoio

Contexto

O processamento de dados é uma realidade no âmbito de muitas aplicações e serviços. Os dados produzidos no decurso do seu funcionamento mostraram ter valor, muito para além da funcionalidade e objetivos estritos dos sistemas particulares. Hoje, a informação é vista com uma matéria prima que precisa de ser explorada, por exemplo, visando o desenvolvimento de novos produtos ou afim de melhorar e optimizar aspectos de produtos já existentes.

No contexto de aplicações e serviços de grande âmbito, por exemplo à escala global, a carga induzida por um elevado número de utilizadores ou de operações concorrentes, obriga à adopção de soluções distribuídas, frequentmente, reocorrendo aos recursos de muitas máquinas. Nestes casos, a informação será também produzida de forma distribuída e poderá atingir uma dimensão considerável. Este aspecto torna pouco prático fazer o seu processamento de forma centralizada. Esta constatação levou, nos últimos anos, ao aparecimento de paradigmas de programação e sistemas e software dedicados ao processamento distribuído de informação, onde a lógica é levar o processamento até aos dados, às máquinas onde eles foram produzidos e estão armazenados (e não ao contrário). O MapReduce, inventado pela Google, é o modelo de processamento de dados distribuíddo que tem sido adoptado ou tem inspirado muitos sistemas dedicados a este problema.

Objectivo

O objetivo do trabalho é desenvolver um sistema de processamento distribuído de dados textuais, baseado no paradigma MapReduce.

Envolverá (1) implementar um repositório distribuído para armazenar os dados a processar e o código fonte Java dos programas MapReduce a executar; (2) adaptar uma solução MapReduce existente, centralizada, afim de tirar partido da distribuição.

MapReduce (Paradigma)

Um programa MapReduce centra-se em duas funções: map e reduce. Usualmente, a função map é usada para transformar ou filtrar os dados de entrada, enquanto que a função reduce produz uma agregação dos dados intermédios produzidos pela função map.

Estas funções operam sobre tuplos (chave, valor) e emitem zero ou mais tuplos (chave, valor) do mesmo ou de outro tipo. A função map é alimentada com os tuplos de entrada, por uma ordem arbitrária. Porém, a cada invocação da função reduce são processados todos os valores dos tuplos com a mesma chave, emitidos pela função map.

Este modelo é compatível com uma execução distribuída (e paralela). Numa primeira fase, dados localizados em múltiplas máquinas podem ser processados localmente pela função map, em paralelo. Consequentemente, em cada máquina serão emitidos um número arbitrário de novos tuplos (chave, valor). Posteriormente, os tuplos emitidos na fase anterior que partilham a mesma chave devem ser reunidos e os seus valores entregues ao mesmo reducer, podendo chaves distintas ser executadas em paralelo (e em máquinas distintas). Note-se que o resultado da fase reduce poderá estar particionado, caso tenham sido produzidos por mais do que uma instância reduce.

Quando a computação desejada é demasiado complexa para realizar num só passo MapReduce, é possível encadear programas MapReduce, usando os resultados do passo anterior para alimentar o seguinte.

Exemplo (Frequência de Palavras)

In [ ]:
public class WordCount extends MapReducer<String, String, String, Long>{

    public void map(String blob, String line ) {
        for( String word : line.split("(?U)[\\p{Space}\\p{Punct}]"))
            super.yield( word.toLowerCase(), 1L);
    }

    public void reduce( String word, Iterable<Long> occurrences ) {
        long sum = 0;
        for( long v : occurrences )
            sum += v;
        super.yield( word, sum );
    }
}

Componentes do Sistema

Repositório

O repositório de dados está limitado a dados textuais. A unidade de armazenamento é o blob de texto. Um blob é caracterizado por um nome e é composto por um número arbitrário (potencialmente muito grande) de linhas de texto.

Os nomes dos blobs podem ter um comprimento variável, mas o espaço de nomes é plano, ie., sem hierarquia explícita.

Além de permitir ler e escrever um blob particular, dado o nome, o repositório permite listar (ou apagar), de uma só vez, todos os blobs cujos nomes têm um determinado prefixo.

Devido à dimensão arbitraria (potencialmente muito grande) dos blobs, estes serão armazenados em blocos de texto. Os blocos de texto terão uma dimensão máxima (em bytes) limitada, garantindo-se ainda que os blocos armazenam linhas de texto completas. Portanto, a dimensão máxima das linhas que é possível armazenar no repositório está limitada pela dimensão máxima do bloco permitida.

Arquitectura

O respositório compreende dois tipos de componentes, um Namenode e vários Datanodes. Os Datanodes têm como função armazenar blocos de texto, correspondentes ao conteúdo dos blobs. O Namenode serve para guardar o nome dos blobs existentes no repositório e quais os blocos que os compõem. Um blob poderá ter o seu conteúdo espalhado por vários Datanodes.

Motor MapReduce

O motor MapReduce opera exclusivamente sobre blobs armazenados no repositório. Os dados de entrada dos programas MapReduce a executar consistem na informação contida nos blobs, cujos nomes partilham um dado prefixo. Os resultados da execução serão também escritos no repositório, resultando em zero ou mais blobs com nomes gerados a partir do prefixo dado. Resultados intermédios (produzidos na fase map) podem ser temporariamente armazenados no repositório.

O código fonte Java dos programas MapReduce será também lido (e compilado) a partir de um blob previamente colocado no repositório.

Interfaces de Programação

Parte dos sistema será desenvolvido usando tecnologia REST e (opcionalmente) em WebServices SOAP. O sistema deverá respeitar as seguintes interfaces de programação. Se necessário, estas poderão ser estendidas com novas operações, mas será imperativo respeitar a semântica das existentes.

In [ ]:
%%classpath add mvn 
com.google.code.gson gson 2.2.4
org.glassfish.jersey.core jersey-common 2.25.1
org.glassfish.jersey.core jersey-client 2.25.1
org.glassfish.jersey.core jersey-server 2.25.1
org.glassfish.jersey.media jersey-media-json-jackson 2.25.1
org.glassfish.jersey.containers jersey-container-jdk-http 2.25.1

Repositório (Serviços Namenode & Datanode)

Datanode

In [ ]:
package api.storage;

import javax.ws.rs.*;
import javax.ws.rs.core.*;

@Path( Datanode.PATH ) 
public interface Datanode {

    static final String PATH = "/datanode";
    
    @POST
    @Path("/")
    @Produces(MediaType.APPLICATION_JSON)
    @Consumes(MediaType.APPLICATION_OCTET_STREAM)
    String createBlock(byte[] data);
    // 200 OK
In [ ]:
@DELETE
    @Path("/{block}")
    void deleteBlock(@PathParam("block") String block);
    // 204 No Content | 404 Not Found
    
    @GET
    @Path("/{block}")
    @Produces(MediaType.APPLICATION_OCTET_STREAM)
    byte[] readBlock(@PathParam("block") String block);
    // 200 OK | 404 Not Found
}

Namenode

In [ ]:
package api.storage;

import java.util.*;

import javax.ws.rs.*;
import javax.ws.rs.core.*;

@Path(Namenode.PATH)
public interface Namenode {

    static final String PATH="/namenode";
    
    @GET
    @Path("/list/")
    @Produces(MediaType.APPLICATION_JSON)
    List<String> list( @QueryParam("prefix") String prefix);
    // 200 OK [empty List]
    
    @GET
    @Path("/{name}")
    @Produces(MediaType.APPLICATION_JSON)
    List<String> read(@PathParam("name") String name);
    // 200 OK | 404 Not Found
In [ ]:
@POST
    @Path("/{name}")
    @Consumes(MediaType.APPLICATION_JSON)
    void create(@PathParam("name") String name, List<String> blocks);
    // 204 No Content | 409 Conflict
    
    @PUT
    @Path("/{name}")
    @Consumes(MediaType.APPLICATION_JSON)
    void update(@PathParam("name") String name, List<String> blocks);
    // 204 No Content | 404 Not Found

    @DELETE
    @Path("/list/")
    void delete( @QueryParam("prefix") String prefix);
    // 204 No Content | 404 Not Found
}

Repositório (Cliente)

As operações disponibilizadas pelos serviços REST do Namenode e Datanodes estão complementadas por uma biblioteca cliente, destinada a oferecer operações de alto-nível sobre blobs às aplicações. É sobre esta biblioteca que opera o motor MapReduce fornecido.

In [ ]:
package api.storage;

import java.util.List;

public interface BlobStorage {
    
    List<String> listBlobs( String prefix );
    
    void deleteBlobs( String prefix );
    
    BlobReader readBlob( String name  );

    BlobWriter blobWriter( String name );

    interface BlobReader extends Iterable<String> {
        String readLine();
    }
    
    interface BlobWriter {
        void writeLine(String line);
        void close();
    }
}

Motor MapReduce

O modelo de programação suportado pelo motor MapReduce obedece à seguinte interface.

Suporta tuplos de tipos genéricos, mas existe suporte para a noção de combiner.

Nota: O desenvolvimento de programas MapReduce não faz parte dos objetivos. Serão fornecidos programas de teste.

In [ ]:
package api.mapreduce;

import java.util.stream.Stream;

public interface MapReduce<MK, MV, RK, RV> {
    
    void map_init();

    void map(MK key, MV val);
    
    void map_end();

    void reduce_init();
    
    void reduce(RK key, Stream<RV> val);
    
    void reduce_end();
    
    <K,V> void yield( K key, V val) //usado nos programas MapReduce para emitir tuplos (chave, valor)
}

Execução Centralizada (dos Programas MapReduce)

Uma computação MapReduce tem como base um código fonte, escrito em Java, e guardado no repositório num blob. Os dados de entrada da computação serão obtidos do repositório e têm como alvo um número variável de blobs cujo nome contém um prefixo dado como parâmetro da computação. Eventuais dados de saída serão escritos num ou mais blobs, construindo o seu nome também com base num prefixo.

In [ ]:
void MapReduceEngine.execute( String jobClassBlob, String inputPrefix , String outputPrefix );

A versão centralizada do motor de execução fornecido caracteriza-se por correr integralmente no cliente que puxa os dados integralmente para executar os programas MapReduce. Esta versão tem como objetivo descrever as fases de execução MapReduce e como o repositório é explorado para esse fim. Nomeadamente, ilustra como se tira partido da possibilidade dada pelo repositório de agregar nomes de blobs com base num prefixo.

Quando é lançada uma execução, começa pela fase map. Uma única tarefa com esse fim irá iterar sobre todas as linhas de todos os blobs cujo nome contém o prefixo indica como parâmetro da execução. Para cada uma dessas linha é invocada a função map do programa MapReduce (este é compilado nesse momento). Esta função irá gerar tuplos (chave, valor), tipados de acordo com o código do programa. Cada chave irá traduzir-se na criação de um novo blob, visando recolher os valores dos tuplos emitidos que partilhem essa chave. O nome do blob gerado para cada chave tem o seguinte formato:

<output-prefix>-map-<key>-<worker>

O valor de <key> resulta de duas codificações. Primeiro, o valor da chave é convertido para JSON e esse resultado é codificado em Base52. Desta maneira, <key> será composto apenas por letras e números. O sufixo <worker> na versão centralizada toma o valor de "client" mas é livre, desde que resulte num nome válido (para o repositório).

Terminada a fase map, terão sido produzidos e guardados no repositório tantos blobs, quantas as chaves que foram emitidas. Esses blobs podem ser enumerados interrogando o repositório pelos nomes dos blobs com o prefixo: <output-prefix>-map-

Segue-se a fase reduce. Para cada chave, antes de poder invocar a operação reduce, é necessário selecionar todos os valores que a partilham. As chaves a processar são determinadas por via dos nomes dos blobs produzidos na fase map. Correspondem ao valor tomado por <key> nos nomes dos blobs relevantes. Para cada chave e os respetivos valores é invocada a função reduce do programa. Os resultados da função reduce são recolhidos em blobs cujo nome tem o seguinte formato:

<output-prefix>-reduce-<key>-<worker> ou <output-prefix>-reduce-<partition>-<worker>, dependendo da configuração do motor.

O primeiro caso, corresponde a emitir um blob com os resultados da função reduce para cada chave individual. No segundo caso, o conjunto das chaves que serão alvo da função reduce é dividido em partições (disjuntas) e processam-se as chaves de cada partição sequencialmente, indo os resultados de cada partição para um mesmo blob.

Sendo uma versão centralizada, o motor de execução está aquém do possível. explora o facto de a leitura e escrita de blobs com nomes distintos poder fazer-se em paralelo.

Requisitos da Solução

A solução a desenvolver deve implementar um conjunto de funcionalidades base conformes com a especificação do sistema presente neste enunciado. A título valorativo, poderão ser implementadas outras funcionalidades.

Funcionalidades Base

  • Serviços Namenode & Datanode implementados usando tecnologia REST Jersey JAX-RS; €€€
    • Estado dos Datanodes persistente em memória secundária local (disco); €
  • Biblioteca cliente funcional para acesso a um repositório remoto constituído por um Namenode e vários Datanodes; €€
  • Tolerar falhas de comunicação temporárias; €

Nota: Cumprindo os requisitos acima, o motor MapReduce fornecido deverá poder operar com um repositório remoto (ainda que a computação se faça centralizadamente no cliente)

Funcionalidades Opcionais (Valorativas)

  • Motor MapReduce descentralizado. €€€
    • Suporte da execução remota (e distribuída) do motor MapReduce, conforme a interface WebServices SOAP indicada abaixo. €€
    • Explorar a afinidade entre dados e as tarefas MapReduce (localidade dos dados) €€€
  • Garbage Collection. €
    • Remoção de blocos não alcançáveis, não pertencentes a nenhum blob presente no Namenode.
In [ ]:
package api.mapreduce;

import javax.jws.*;

@WebService
public interface ComputeNode {
    static final String PATH = "/mapreduce";
    static final String NAME = "ComputeService";
    static final String NAMESPACE = "http://sd2018";
    static final String INTERFACE = "api.mapReduce.ComputeNode";

    @WebMethod
    void mapper( String jobClassBlob, String inputPrefix , String outputPrefix );
    
    @WebMethod
    void reducer( String jobClassBlob, String inputPrefix , String outputPrefix );
    
    @WebMethod
    void mapReduce( String jobClassBlob, String inputPrefix , String outputPrefix );
}

Desenvolvimento

A solução deverá ser desenvolvida num ambiente tão próximo quanto o possível ao ambiente Linux existente nos laboratórios, seja em modo nativo ou virtualizado. Tendo em conta que o trabalho será discutido, individualmente, em princípio, nas máquinas dos laboratórios e só nestas, é imperativo que o "pipeline" de desenvolvimento esteja perfeitamente dominado. Nessa ocasião poderá ser necessário alterar o código, testar e obter novos resultados, num espaço de tempo limitado...

Para agilizar e uniformizar o processo de desenvolvimento e teste, o trabalho deverá ser entregue de forma a ser compilado e empacotado numa imagem Docker, por recurso ao Maven, usando as ferramentas apresentadas nas aulas práticas.

Será disponibilizada uma bateria de testes destinada a validar a conformidade do trabalho com a especificação apresentada neste documento. A passagem dos testes é condição necessária mas insuficiente para garantir a correção do trabalho. Este deve ser realizado com a especificação em mente e não para passar os testes.

Uma vez fixada a especificação com a publicação definitiva do trabalho, existe o compromisso que eventuais alterações serão pontuais e destinadas a clarificar alguma ambiguidade ou deficiência na especificação. Reitera-se que a existência de testes pretende ajudar a despistar problemas de conformidade com a especificação. Nesse sentido, a bateria de testes poderá ser atualizada e em caso algum representará novos requisitos.

Relatório e Entrega

As instruções relativas à entrega do trabalho e do relatório serão, oportunamente, adicionadas ao anexo disponibilizado sob a forma de um documento Google Docs à parte.

O mesmo anexo será também a forma preferêncial para disseminar qualquer informação adicional relevante ao contexto da realização do trabalho prático. Recomenda-se a consulta períodica do referido documento.

Materiais de Apoio