Cómo migrar de Elasticsearch 1.7 a 6.8 sin tiempo de inactividad

Mi última tarea en BigPanda fue actualizar un servicio existente que usaba Elasticsearch versión 1.7 a una versión más nueva de Elasticsearch, 6.8.1.

En esta publicación, compartiré cómo migramos de Elasticsearch 1.6 a 6.8 con estrictas restricciones como cero tiempo de inactividad, pérdida de datos y cero errores. También le proporcionaré un script que realiza la migración por usted.

Esta publicación contiene 6 capítulos (y uno es opcional):

  • ¿Qué hay para mi ahí dentro? -> ¿Cuáles fueron las novedades que nos llevaron a actualizar nuestra versión?
  • Las limitaciones -> ¿Cuáles eran nuestros requisitos comerciales?
  • Resolución de problemas -> ¿Cómo abordamos las limitaciones?
  • Avanzando -> El plan.
  • [Capítulo opcional] -> ¿Cómo manejamos el infame problema de la explosión de mapas?
  • Finalmente -> Cómo realizar la migración de datos entre clústeres.

Capítulo 1 - ¿Qué gano yo?

¿Qué beneficios esperábamos resolver al actualizar nuestro almacén de datos?

Hubo un par de razones:

  1. Problemas de rendimiento y estabilidad: estábamos experimentando una gran cantidad de interrupciones con un MTTR largo que nos causó muchos dolores de cabeza. Esto se reflejó en frecuentes altas latencias, alto uso de CPU y más problemas.
  2. Soporte inexistente en versiones antiguas de Elasticsearch: nos faltaban algunos conocimientos operativos en Elasticsearch y, cuando buscamos consultoría externa, nos animaron a migrar hacia adelante para recibir soporte.
  3. Asignaciones dinámicas en nuestro esquema: nuestro esquema actual en Elasticsearch 1.7 utilizó una característica llamada asignaciones dinámicas que hizo que nuestro clúster explotara varias veces. Por eso queríamos abordar este problema.
  4. Poca visibilidad en nuestro clúster existente: queríamos una mejor vista bajo el capó y vimos que las versiones posteriores tenían excelentes herramientas de exportación de métricas.

Capítulo 2 - Las limitaciones

  • Migración de tiempo de inactividad CERO: tenemos usuarios activos en nuestro sistema y no podíamos permitirnos que el sistema estuviera inactivo mientras estábamos migrando.
  • Plan de recuperación: no podíamos permitirnos "perder" o "corromper" datos, sin importar el costo. Así que necesitábamos preparar un plan de recuperación en caso de que fallara nuestra migración.
  • Cero errores: no pudimos cambiar la funcionalidad de búsqueda existente para los usuarios finales.

Capítulo 3 - Resolución de problemas y elaboración de un plan

Abordemos las limitaciones de la más simple a la más difícil:

Cero errores

Para abordar este requisito, estudié todas las posibles solicitudes que recibe el servicio y cuáles fueron sus resultados. Luego agregué pruebas unitarias donde fue necesario.

Además, agregué múltiples métricas (al Elasticsearch Indexery al new Elasticsearch Indexer) para rastrear la latencia, el rendimiento y el rendimiento, lo que me permitió validar que solo las mejoramos.

Plan de recuperación

Esto significa que necesitaba abordar la siguiente situación: implementé el nuevo código en producción y las cosas no funcionaban como se esperaba. ¿Qué puedo hacer al respecto entonces?

Como estaba trabajando en un servicio que usaba el abastecimiento de eventos, podía agregar otro oyente (diagrama adjunto a continuación) y comenzar a escribir en un nuevo clúster de Elasticsearch sin afectar el estado de producción

Migración sin tiempo de inactividad

El servicio actual está en modo en vivo y no se puede "desactivar" por períodos de más de 5 a 10 minutos. El truco para hacerlo bien es este:

  • Almacene un registro de todas las acciones que está manejando su servicio (usamos Kafka en producción)
  • Inicie el proceso de migración fuera de línea (y realice un seguimiento de la compensación antes de iniciar la migración)
  • Cuando finalice la migración, inicie el nuevo servicio contra el registro con el desplazamiento registrado y recupere el retraso
  • Cuando termine el retraso, cambie su interfaz para consultar el nuevo servicio y ya está.

Capítulo 4 - El plan

Nuestro servicio actual utiliza la siguiente arquitectura (basada en el paso de mensajes en Kafka):

  1. Event topiccontiene eventos producidos por otras aplicaciones (por ejemplo, UserId 3 created)
  2. Command topiccontiene la traducción de estos eventos en comandos específicos utilizados por esta aplicación (por ejemplo: Add userId 3)
  3. Elasticsearch 1.7: el almacén de datos command Topicleído por Elasticsearch Indexer.

Planeamos agregar otro consumidor ( new Elasticsearch Indexer) al command topic, que leerá exactamente los mismos mensajes y los escribirá en paralelo a Elasticsearch 6.8.

¿Donde debería empezar?

Para ser honesto, me consideraba un usuario novato de Elasticsearch. Para sentirme seguro de realizar esta tarea, tuve que pensar en la mejor manera de abordar este tema y aprenderlo. Algunas cosas que ayudaron fueron:

  1. Documentación: es un recurso increíblemente útil para todo Elasticsearch. Tómese el tiempo para leerlo y tomar notas (no se pierda: Mapping y QueryDsl).
  2. HTTP API: todo en CAT API. Esto fue muy útil para depurar cosas localmente y ver cómo responde Elasticsearch (no te pierdas: estado del clúster, índices de gato, búsqueda, eliminar índice).
  3. Métricas (❤️): desde el primer día, configuramos un nuevo y brillante panel con muchas métricas interesantes (tomadas de elasticsearch-exporter-for-Prometheus ) que nos ayudaron y nos empujaron a comprender más sobre Elasticsearch.

El código

Nuestro código base estaba usando una biblioteca llamada elastic4s y estaba usando la versión más antigua disponible en la biblioteca, ¡una muy buena razón para migrar! Entonces, lo primero que se debe hacer fue migrar versiones y ver qué se rompió.

Hay algunas tácticas sobre cómo realizar esta migración de código. La táctica que elegimos fue intentar restaurar la funcionalidad existente primero en la nueva versión de Elasticsearch sin volver a escribir todo el código desde el principio. En otras palabras, para alcanzar la funcionalidad existente pero en una versión más nueva de Elasticsearch.

Afortunadamente para nosotros, el código ya contenía una cobertura de prueba casi completa, por lo que nuestra tarea fue mucho más simple y eso tomó alrededor de 2 semanas de tiempo de desarrollo.

Es importante tener en cuenta que, si ese no fuera el caso, habríamos tenido que invertir algo de tiempo en completar esa cobertura. Solo entonces podríamos migrar, ya que una de nuestras limitaciones era no romper la funcionalidad existente.

Capítulo 5 - El problema de la explosión de mapas

Describamos nuestro caso de uso con más detalle. Este es nuestro modelo:

class InsertMessageCommand(tags: Map[String,String])

Y, por ejemplo, una instancia de este mensaje sería:

new InsertMessageCommand(Map("name"->"dor","lastName"->"sever"))

Y dado este modelo, necesitábamos admitir los siguientes requisitos de consulta:

  1. Consultar por valor
  2. Consultar por nombre y valor de etiqueta

La forma en que esto se modeló en nuestro esquema de Elasticsearch 1.7 fue utilizando un esquema de plantilla dinámica (ya que las claves de etiqueta son dinámicas y no se pueden modelar de forma avanzada).

The dynamic template caused us multiple outages due to the mapping explosion problem, and the schema looked like this:

curl -X PUT "localhost:9200/_template/my_template?pretty" -H 'Content-Type: application/json' -d ' { "index_patterns": [ "your-index-names*" ], "mappings": { "_doc": { "dynamic_templates": [ { "tags": { "mapping": { "type": "text" }, "path_match": "actions.tags.*" } } ] } }, "aliases": {} }' curl -X PUT "localhost:9200/your-index-names-1/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "John", "lname" : "Smith" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/2?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "Dor", "lname" : "Sever" } } } ' curl -X PUT "localhost:9200/your-index-names-1/_doc/3?pretty" -H 'Content-Type: application/json' -d' { "actions": { "tags" : { "name": "AnotherName", "lname" : "AnotherLastName" } } } ' 
 curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.name" : { "query" : "John" } } } } ' # returns 1 match(doc 1) curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "match" : { "actions.tags.lname" : { "query" : "John" } } } } ' # returns zero matches # search by value curl -X GET "localhost:9200/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "query_string" : { "fields": ["actions.tags.*" ], "query" : "Dor" } } } ' 

Nested documents solution

Our first instinct in solving the mapping explosion problem was to use nested documents.

We read the nested data type tutorial in the Elastic docs and defined the following schema and queries:

curl -X PUT "localhost:9200/my_index?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "nested" } } } } } ' curl -X PUT "localhost:9200/my_index/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "key" : "John", "value" : "Smith" }, { "key" : "Alice", "value" : "White" } ] } ' # Query by tag key and value curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.key": "Alice" }}, { "match": { "tags.value": "White" }} ] } } } } } ' # Returns 1 document curl -X GET "localhost:9200/my_index/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "nested": { "path": "tags", "query": { "bool": { "must": [ { "match": { "tags.value": "Smith" }} ] } } } } } ' # Query by tag value # Returns 1 result 

And this solution worked. However, when we tried to insert real customer data we saw that the number of documents in our index increased by around 500 times.

We thought about the following problems and went on to find a better solution:

  1. The amount of documents we had in our cluster was around 500 million documents. This meant that, with the new schema, we were going to reach two hundred fifty billion documents (that’s 250,000,000,000 documents ?).
  2. We read this really good blog post — //blog.gojekengineering.com/elasticsearch-the-trouble-with-nested-documents-e97b33b46194 which highlights that nested documents can cause high latency in queries and heap usage problems.
  3. Testing — Since we were converting 1 document in the old cluster to an unknown number of documents in the new cluster, it would be much harder to track if the migration process worked without any data loss. If our conversion was 1:1, we could assert that the count in the old cluster equalled the count in the new cluster.

Avoiding nested documents

The real trick in this was to focus on what supported queries we were running: search by tag value, and search by tag key and value.

The first query does not require nested documents since it works on a single field. For the latter, we did the following trick. We created a field that contains the combination of the key and the value. Whenever a user queries on a key, value match, we translate their request to the corresponding text and query against that field.

Example:

curl -X PUT "localhost:9200/my_index_2?pretty" -H 'Content-Type: application/json' -d' { "mappings": { "_doc": { "properties": { "tags": { "type": "object", "properties": { "keyToValue": { "type": "keyword" }, "value": { "type": "keyword" } } } } } } } ' curl -X PUT "localhost:9200/my_index_2/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "tags" : [ { "keyToValue" : "John:Smith", "value" : "Smith" }, { "keyToValue" : "Alice:White", "value" : "White" } ] } ' # Query by key,value # User queries for key: Alice, and value : White , we then query elastic with this query: curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.keyToValue": "Alice:White" }}] }}} ' # Query by value only curl -X GET "localhost:9200/my_index_2/_search?pretty" -H 'Content-Type: application/json' -d' { "query": { "bool": { "must": [ { "match": { "tags.value": "White" }}] }}} ' 

Chapter 6 — The migration process

We planned to migrate about 500 million documents with zero downtime. To do that we needed:

  1. A strategy on how to transfer data from the old Elastic to the new Elasticsearch
  2. A strategy on how to close the lag between the start of the migration and the end of it

And our two options in closing the lag:

  1. Our messaging system is Kafka based. We could have just taken the current offset before the migration started, and after the migration ended, start consuming from that specific offset. This solution requires some manual tweaking of offsets and some other stuff, but will work.
  2. Another approach to solving this issue was to start consuming messages from the beginning of the topic in Kafka and make our actions on Elasticsearch idempotent — meaning, if the change was “applied” already, nothing would change in Elastic store.

The requests made by our service against Elastic were already idempotent, so we choose option 2 because it required zero manual work (no need to take specific offsets, and then set them afterward in a new consumer group).

How can we migrate the data?

These were the options we thought of:

  1. If our Kafka contained all messages from the beginning of time, we could just play from the start and the end state would be equal. But since we apply retention to out topics, this was not an option.
  2. Dump messages to disk and then ingest them to Elastic directly – This solution looked kind of weird. Why store them in disk instead of just writing them directly to Elastic?
  3. Transfer messages between old Elastic to new Elastic — This meant, writing some sort of “script” (did anyone say Python? ?) that will connect to the old Elasticsearch cluster, query for items, transform them to the new schema, and index them in the cluster.

We choose the last option. These were the design choices we had in mind:

  1. Let’s not try to think about error handling unless we need to. Let’s try to write something super simple, and if errors occur, let’s try to address them. In the end, we did not need to address this issue since no errors occurred during the migration.
  2. It’s a one-off operation, so whatever works first / KISS.
  3. Metrics — Since the migration processes can take hours to days, we wanted the ability from day 1 to be able to monitor the error count and to track the current progress and copy rate of the script.

We thought long and hard and choose Python as our weapon of choice. The final version of the code is below:

dictor==0.1.2 - to copy and transform our Elasticsearch documentselasticsearch==1.9.0 - to connect to "old" Elasticsearchelasticsearch6==6.4.2 - to connect to the "new" Elasticsearchstatsd==3.3.0 - to report metrics 
from elasticsearch import Elasticsearch from elasticsearch6 import Elasticsearch as Elasticsearch6 import sys from elasticsearch.helpers import scan from elasticsearch6.helpers import parallel_bulk import statsd ES_SOURCE = Elasticsearch(sys.argv[1]) ES_TARGET = Elasticsearch6(sys.argv[2]) INDEX_SOURCE = sys.argv[3] INDEX_TARGET = sys.argv[4] QUERY_MATCH_ALL = {"query": {"match_all": {}}} SCAN_SIZE = 1000 SCAN_REQUEST_TIMEOUT = '3m' REQUEST_TIMEOUT = 180 MAX_CHUNK_BYTES = 15 * 1024 * 1024 RAISE_ON_ERROR = False def transform_item(item, index_target): # implement your logic transformation here transformed_source_doc = item.get("_source") return {"_index": index_target, "_type": "_doc", "_id": item['_id'], "_source": transformed_source_doc} def transformedStream(es_source, match_query, index_source, index_target, transform_logic_func): for item in scan(es_source, query=match_query, index=index_source, size=SCAN_SIZE, timeout=SCAN_REQUEST_TIMEOUT): yield transform_logic_func(item, index_target) def index_source_to_target(es_source, es_target, match_query, index_source, index_target, bulk_size, statsd_client, logger, transform_logic_func): ok_count = 0 fail_count = 0 count_response = es_source.count(index=index_source, body=match_query) count_result = count_response['count'] statsd_client.gauge(stat='elastic_migration_document_total_count,index={0},type=success'.format(index_target), value=count_result) with statsd_client.timer('elastic_migration_time_ms,index={0}'.format(index_target)): actions_stream = transformedStream(es_source, match_query, index_source, index_target, transform_logic_func) for (ok, item) in parallel_bulk(es_target, chunk_size=bulk_size, max_chunk_bytes=MAX_CHUNK_BYTES, actions=actions_stream, request_timeout=REQUEST_TIMEOUT, raise_on_error=RAISE_ON_ERROR): if not ok: logger.error("got error on index {} which is : {}".format(index_target, item)) fail_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=failure'.format(index_target), 1) else: ok_count += 1 statsd_client.incr('elastic_migration_document_count,index={0},type=success'.format(index_target), 1) return ok_count, fail_count statsd_client = statsd.StatsClient(host='localhost', port=8125) if __name__ == "__main__": index_source_to_target(ES_SOURCE, ES_TARGET, QUERY_MATCH_ALL, INDEX_SOURCE, INDEX_TARGET, BULK_SIZE, statsd_client, transform_item) 

Conclusion

Migrating data in a live production system is a complicated task that requires a lot of attention and careful planning. I recommend taking the time to work through the steps listed above and figure out what works best for your needs.

As a rule of thumb, always try to reduce your requirements as much as possible. For example, is a zero downtime migration required? Can you afford data-loss?

Upgrading data stores is usually a marathon and not a sprint, so take a deep breath and try to enjoy the ride.

  • The whole process listed above took me around 4 months of work
  • All of the Elasticsearch examples that appear in this post have been tested against version 6.8.1