Introducing Elasticsearch as a Structured Search Provider

11.05.2018
Peer Müller

Full text search is a crucial feature for any website that contains a considerable amount of data. If requested information cannot be found or it takes too much time to browse through endless lists, customers will probably turn their backs on your product. Therefore, developing a full text search feature for our customer was a priority from the beginning.

Why Elasticsearch?

To implement a full text search we first used the MS SQL Server full text search functionality, but quickly reached performance limits. Three to four simultaneous searches already had a significant impact on the database and as a consequence slowed down the application. It became clear that this solution would not scale.

Example of a low-end database production resource configuration

Therefore, we decided, based on previous positive experience, to introduce Elasticsearch as the backend search provider. This choice also had additional benefits such as introducing typo tolerance in search terms (fuzziness) as well as phonetic matching to increase recall - to spread the net wide enough to catch documents that might possibly match.

German surname "Müller" using the search term "Miller" will be returned using phonetic matching (the phonetic analyzer generates the token "MLR" for both terms)

However, introducing Elasticsearch also had the drawback of introducing another application that requires monitoring and maintenance. Additionally, mappings for the Elasticsearch indices, search queries and a process to extract, transform and load (ETL) data from the database to Elasticsearch had to be created.

Schema of the data-flow between the application, database, and Elasticsearch

We decided to use Logstash with the JDBC plugin to retrieve the data from the MSSQL database and feed it into the Elasticsearch server. Historically, each Logstash instance only supported a single pipeline, composed of an input, a filter, and an output stage. With the introduction of multiple pipelines to Logstash, it became very easy to set up specific pipelines for each use case that run independently from each other without the need to create conditions to separate them.

// this will connect to the mysql database at the configured connection url
// the state will be persisted using the :sql_last_value parameter
input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "corporation" => "ACME" }
    schedule => "* * * * *"
    statement => "SELECT * from corporations where corporation = :corporation
                    WHERE modified_date > :sql_last_value"
  }
}

Using this approach we established an ETL process that creates a copy of all relevant fields, restructures them and continuously keeps them up to date. This copy could then be used to search and quickly filter the results.

After that the indices for all objects needed to be set up correctly. An important consideration is whether to use type "keyword" or "text" for strings. While the former is used to index structured content such as email addresses, hostnames, status codes, zip codes or tags, the latter is used to index full-text values. When relationships between objects need to be represented, objects can be nested. However, since Lucene (which Elastic is based on) has no concept of inner objects, object hierarchies are flattened into a simple list of field names and values. To maintain the independence of each object in the array, the "nested" datatype needs to be set on the inner objects. Also note that in order to use nested objects, nested queries and sorting have to be used.

Integrating the client into the Spring application

Since our application is a Spring Project and Spring Data supports Elasticsearch, the first obvious approach was to use the Elasticsearch client that comes "out of the box". However, at that time Spring Boot 2.x was not released yet and only Elasticsearch 2.4.0 was supported. Also, we wanted to keep the Elasticsearch client implementation separate from the Spring Stack to improve upgradeability of our application's Spring stack. Therefore, we decided to use the Jest client that utilizes the REST API. For a comparison of the different clients the article Interfacing with Elasticsearch: Picking a Client is quite helpful.

To integrate the Jest client with the application, we needed to add the dependency and set the URL in the configuration yaml. Spring Boot will automatically detect the JestClient on the classpath and check if the URL configured at spring.elasticsearch.jest.uris is reachable. It is a good idea to set the correct URL here, as Spring's health check is also influenced.

spring:
    elasticsearch:
        jest:
            uris: ${SPRING_DATA_ELASTICSEARCH_HOST}

Since we used lower case field names in Elasticsearch, the usual Java camel case field names had to be mapped accordingly. Since Spring Boot 2.x gson auto configuration is available, which allows the following code fragment to be set up solely using the properties spring.gson.*.

@Configuration
public class GsonConfig {

  @Bean
  public Gson gson(){
    // configure Gson for jest client (Elasticsearch) to use the correct date deserializer
    // and field mapping policy
    return new GsonBuilder()
        .registerTypeAdapter(LocalDateTime.class, new DateTimeTypeConverter())
        .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
        .create();
  }

}

The DateTimeTypeConverter was used to register the DateFormat to parse date fields that were stored in Elasticsearch.

public class DateTimeTypeConverter implements JsonSerializer<LocalDateTime>,
    JsonDeserializer<LocalDateTime> {

  @Override
  public LocalDateTime deserialize(JsonElement json, Type typeOfT,
      JsonDeserializationContext context){
    return LocalDateTime.parse(json.getAsString(),
            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
  }

  @Override
  public JsonElement serialize(LocalDateTime src, Type typeOfSrc,
      JsonSerializationContext context) {
    return new JsonPrimitive(src.toString());
  }
}

The actual search query implementation was built using a bool query with a nested multi match query. This gives additional boost to the score of a search hit when the search query matches in certain fields (e.g. a match in the "name" field will boost the score more than a match in the "country code").

public Page<Hit<IssuerDoc, Void>> searchForIssuers(String searchTerm, Integer pageNumber,
    Integer pageSize, IssuerSearchFilterDto filters, List<SortField> sortFields) {

  final BoolQueryBuilder query = QueryBuilders.boolQuery();
  if (Strings.isNotBlank(searchTerm)) {
     query.must(QueryBuilders.multiMatchQuery(searchTerm,
        ULTIMATE_ISSUER_NAME,
        ULTIMATE_ISSUER_NAME + PHONETIC_SUFFIX,
        "address_line",
        COUNTRY_CODE,
        "city",
        "country",
        RATING_ANALYST_FIELD)
        .field("name", QUATTRO_BOOST)
        .field("short_name", QUATTRO_BOOST)
        .field("bloomberg_id", MAXIMUM_BOOST));
  }

  return getHits(IssuerDoc.class, pageNumber, pageSize, query, "issuer", sortFields);
}

The getHits method will handle the paging and sorting as well as converting to the target type.

private <T> Page<Hit<T, Void>> getHits(Class<T> sourceType,
    Integer pageNumber, Integer pageSize, BoolQueryBuilder query, String index,
    List<SortField> sortFields) {

  if (pageNumber == 0 || (pageNumber * pageSize) > ES_MAX_RESULTS_WINDOW) {
    throw new BadRequestException(SEARCH_INVALID_PAGE,
        String.format("invalid page %d, page must be positive and must not exclude over %d results",
        pageNumber, ES_MAX_RESULTS_WINDOW));
  }

  SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
      .size(pageSize)
      .from((pageNumber - 1) * pageSize);

  //sort -> be aware of nested and plain sort
  if (!sortFields.isEmpty()) {
    sortFields.forEach(s -> {
      if (!s.isNestedSort()) {
        searchSourceBuilder.sort(s.getFieldName(), s.getOrder());
      } else {
        searchSourceBuilder.sort(
            SortBuilders.fieldSort(s.getFieldName())
                .setNestedPath(s.getNestedPath()).missing("_last").order(s.getOrder()));
      }
    });
    searchSourceBuilder.sort("_score");
  }

  searchSourceBuilder.query(query);

  Search.Builder searchBuilder = new Builder(searchSourceBuilder.toString())
      .addIndex(index)
      .addType(index);

  try {
    SearchResult result = jestClient.execute(searchBuilder.build());
    if (result.isSucceeded()) {
      return new PageImpl<>(
        result.getHits(sourceType), null, Math.min(ES_MAX_RESULTS_WINDOW, result.getTotal()));
    } else {
      throw new BadRequestException(ApiErrorCode.ELASTIC_SEARCH_ERROR, result.getErrorMessage());
    }
  } catch (IOException e) {
    LOGGER.error("Problem fetching data from Elasticsearch.", e);
  }
  return new PageImpl<>(Collections.emptyList());
}

The resulting query that is sent to the Elasticsearch server.

{
  "bool" : {
    "must" : [
      {
        "multi_match" : {
          "query" : "ACME",
          "fields" : [
            "address_line^1.0",
            "bloomberg_id^10.0",
            "city^1.0",
            "country^1.0",
            "country_code^1.0",
            "name^4.0",
            "ratings.analyst.acronym^1.0",
            "short_name^4.0",
            "ultimate_issuer_name^1.0",
            "ultimate_issuer_name.phonetic^1.0"
          ],
          "type" : "best_fields",
          "operator" : "OR",
          "slop" : 0,
          "prefix_length" : 0,
          "max_expansions" : 50,
          "lenient" : false,
          "zero_terms_query" : "NONE",
          "boost" : 1.0
        }
      }
    ],
    "disable_coord" : false,
    "adjust_pure_negative" : true,
    "boost" : 1.0
  }
}

Experience in live setup

Integration testing with an embedded Elasticsearch server extending the ESIntegTestCase class was not a trivial task because of Elastic's inbuilt jar-hell checker, which found some conflicts in transitive dependencies that were tricky to resolve.

Setting up Elasticsearch with a kubernetes cluster proved to be relatively simple with several pre-built docker containers being available. Just make sure kubernetes' deployment strategy is set to "recreate", since Elasticsearch is a stateful application and you can not have more than one pod running at the same time. Since the search query logic heavily depends on the right indices being present, this was not acceptable and lead to repeated manual recreating of indexes and data feeds. Although there is the option to re-index with zero downtime using aliases, it quickly became clear that this tasks needed automation. Automatic index creation can be turned off in Elasticsearch and via Logstash's Elasticsearch output plugin index templates can be created.

//configure the elasticsearch output plugin to create a template with name "orders"
//before the actual pipeline sends data
//it will overwrite a template with the same name
//the file orders-template.json is used to as payload
output {
    elasticsearch {
      hosts => ["${ES_URL}"]
      document_id => "%{id}"
      index => "orders"
      template => "/usr/share/templates/orders-template.json"
      template_overwrite => true
      template_name => "orders"
    }
}

In conclusion, Elasticsearch improved the search performance to a great extent. Additionally, this approach made it possible to improve the search result quality by applying scoring and ranking the search hits accordingly. Finally, scaling this solution can be as easy as adding additional nodes thanks to Elastic's cluster approach.

The image credits go to Elastic.co

Peer Müller

Peer hat die vergangenen Jahre als Java-Entwickler, Testingenieur und Software-Architekt in einem Requirements Engineering Projekt bei Biotronik gearbeitet. Insgesamt kann er auf mehr als 8 Jahre Erfahrung in der IT Branche zurückblicken. Schon während seines Studiums der…

Mehr Lesen ...
comments powered by Disqus