Elasticsearch w projektach Java – wzbogacanie danych

Mariusz Maślanka

Zaktualizowaliśmy ten tekst dla Ciebie!
Data aktualizacji: 31.12.2024
Autor aktualizacji: Piotr Merynda

W poprzednich artykułach przedstawiono sposób indeksowania i odpytywania dokumentów przy użyciu pojedynczego indeksu. Wyobraźmy sobie bardziej realny scenariusz, w którym istotne informacje są przechowywane w kilku indeksach i dane powinny zostać połączone. Jednym z podejść jest odczytywanie dokumentów z każdego indeksu osobno i używanie czystej Javy do łączenia wyników, jednak może to być podatne na błędy. Drugie rozwiązanie jest dostarczane bezpośrednio z Elasticsearch i nazywa się wzbogacaniem danych (ang. data enrichment), gdzie dane z indeksów źródłowych mogą wzbogacać wczytane dokumenty. Niniejszy artykuł przedstawia, czym jest wzbogacanie danych, jak można je skonfigurować i wykorzystać w projektach Java w praktyce.

Data Enrichment

Procesor wzbogacania danych został wprowadzony w Elasticsearch 7.5.0. Może być używany do dodawania danych z istniejących indeksów do przychodzących dokumentów podczas ich pozyskiwania. W przypadku aplikacji demonstracyjnej dane sterowników są wzbogacone o statystyki.

Potok pozyskiwania (ang. ingest pipeline)

Zestaw procesorów, które mogą być używane do manipulowania danymi podczas pozyskiwania bez większego narzutu. Każdy procesor wykonuje określone zadanie, np. wzbogacanie.

polityka wzbogacania (ang. enrich policy)

Zestaw opcji konfiguracyjnych używanych do dodawania właściwych danych wzbogacających do właściwych dokumentów przychodzących. Zawiera:

  • listę jednego lub więcej indeksów źródłowych, które przechowują wzbogacone dane jako dokumenty
  • Pole dopasowania z indeksów źródłowych używane do dopasowywania dokumentów przychodzących
  • Pola wzbogacające zawierające dane wzbogacające z indeksów źródłowych, które chcesz dodać do dokumentów przychodzących.

Przed wykorzystaniem procesora wzbogacania, musi zostać wykonana polityka wzbogacania. Po wykonaniu polityka wzbogacania wykorzystuje dane wzbogacania z indeksów źródłowych polityki do utworzenia usprawnionego indeksu systemowego zwanego indeksem wzbogacania. Procesor używa tego indeksu do dopasowywania i wzbogacania przychodzących dokumentów.

indeks źródłowy (ang. source index)

Indeks, który przechowuje dane wzbogacające dodawane do dokumentów przychodzących. W polityce wzbogacania może być używanych wiele indeksów źródłowych lub ten sam indeks źródłowy w wielu politykach wzbogacania.

indeks wzbogacania (ang.enrich index)

Specjalny indeks systemowy tylko do odczytu powiązany z określoną polityką wzbogacania, który zawsze zaczyna się od .enrich-*. Jest on używany przez procesor wzbogacania w celu przyspieszenia procesu i nie powinien być używany do innych celów.

Projekt demonstracyjny

W aplikacji demonstracyjnej wprowadzono kolejny zestaw danych – statystyki kariery kierowców. Są one przechowywane w osobnym indeksie db-statistics. Po zindeksowaniu dokumentów kierowców należy je wzbogacić o informacje o statystykach.

Konfiguracja procesora wzbogacającego w Java API

Konfiguracja procesora wzbogacania wymaga wykonania kilku kroków, które opisano poniżej wraz z kilkoma fragmentami kodu.

1. utworzenie polityki wzbogacania

Pierwszym krokiem jest utworzenie polityki wzbogacania przy użyciu obiektu EnrichClient, który jest uzyskiwany z wysoko poziomowego klienta REST. Ważne jest, aby wspomnieć, że może istnieć tylko jedna polityka o podanej nazwie, więc wymagane jest sprawdzenie, czy istnieje już polityka o podanej nazwie. W przeciwnym razie Elasticsearch rzuci wyjątek po wykryciu próby utworzenia zduplikowanej polityki. Do utworzenia polityki potrzebne będą:

  • nazwa polityki;
  • typ;
  • lista indeksów źródłowych
  • pole, pod którym dopasowywane są dane
  • lista wzbogaconych pól;

Po utworzeniu polityki wzbogacania nie można jej aktualizować ani zmieniać. Należy ją po prostu usunąć i w razie potrzeby utworzyć ponownie.

private final RestHighLevelClient client;

EnrichClient enrichClient = client.enrich();

GetPolicyResponse policy = enrichClient.getPolicy(new GetPolicyRequest(policyName), RequestOptions.DEFAULT);

if (policy.getPolicies().isEmpty()) {
PutPolicyRequest policyRequest = new PutPolicyRequest(policyName, "match",
    List.of(sourceIndex),
    matchField,
    enrichedFields);
enrichClient.putPolicy(policyRequest, RequestOptions.DEFAULT);
}

2. wykonanie polityki wzbogacania

Po utworzeniu polityki wzbogacania należy ją wykonać, aby utworzyć indeks wzbogacania. Odbywa się to tak samo jak powyżej przy użyciu tego samego obiektu EnrichClient.

enrichClient.executePolicy(new ExecutePolicyRequest(policyName), RequestOptions.DEFAULT)

3. dodanie procesora wzbogacania do potoku pozyskiwania

Następnym krokiem jest zdefiniowanie procesora wzbogacania wraz z wcześniej utworzoną polityką w potoku pozyskiwania. Dla procesora wzbogacającego zdefiniowane jest pole dopasowania i pole docelowe. W aplikacji demonstracyjnej wzbogacone informacje są przechowywane we właściwości „statistics”.

Jak wspomniano wcześniej, potok pozyskiwania może zawierać jeden lub więcej procesorów. Kolejny remove można zobaczyć na poniższym fragmencie kodu.

{
  "description": "Enrich drivers statistics information",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "enrich-statistics-policy",
        "field" : "driverId",
        "target_field": "statistics"
      }
    },
    {
      "remove": { "field": "statistics.driverId"
      }
    }
  ]
}
public void preparePipeline(String pipelineName, String pipelinePath) {
    IngestClient ingestClient = client.ingest();

    try {
      InputStream inputStream = new ClassPathResource(pipelinePath).getInputStream();
      byte[] bytes = ByteStreams.toByteArray(inputStream);
      PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(bytes), XContentType.JSON);
      ingestClient.putPipeline(putPipelineRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
      log.error("Error during creating pipeline: {}", e.getMessage(), e);
    }
  }

4. Dodanie potoku pozyskiwania podczas tworzenia nowego indeksu

Aby uruchomić potok po zapisaniu nowych dokumentów, wystarczy dodać informacje o potoku do żądania IndexRequest. W rezultacie, gdy nowe dokumenty są indeksowane, są one wzbogacane o wymagane dane.

new IndexRequest(indexName).source(source).setPipeline(pipeline)

Podsumowanie

Proces wzbogacania danych jest niezwykle przydatną funkcją Elasticsearch, która pozwala na dodanie dodatkowych właściwości z istniejących indeksów podczas indeksowania nowych dokumentów. Artykuł wyjaśnia, czym jest wzbogacanie danych, jak działa i pokazuje, jak wprowadzić je do projektu Java używając Java API

Źródła

Poznaj mageek of j‑labs i daj się zadziwić, jak może wyglądać praca z j‑People!

Skontaktuj się z nami