Nube de primavera de flujo de datos errorChannel no funciona

votos
0

Estoy intentando crear un manejador de excepción personalizada para mi Secuencia del resorte Nube de flujo de datos para encaminar algunos errores que se requeued y otros a ser DLQ'd.

Para ello estoy utilizando la integración global de la primavera errorChannel y enrutamiento basado en el tipo de excepción.

Este es el código para el router error de integración de primavera:

package com.acme.error.router;

import com.acme.exceptions.DlqException;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;


@MessageEndpoint
@EnableBinding({ ErrorMessageChannels.class })
public class ErrorMessageMappingRouter {
   private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMessageMappingRouter.class);

   public static final String ERROR_CHANNEL = errorChannel;

   @Router(inputChannel = ERROR_CHANNEL)
    public String onError(Message<Object> message) {
      LOGGER.debug(ERROR ROUTER - onError);
      if(message.getPayload() instanceof MessageTransformationException) {
         MessageTransformationException exception = (MessageTransformationException) message.getPayload();
         Message<?> failedMessage = exception.getFailedMessage();
          if(exceptionChainContainsDlq(exception)) {
             return ErrorMessageChannels.DLQ_QUEUE_NAME;
          }
         return ErrorMessageChannels.REQUEUE_CHANNEL;
      }
      return ErrorMessageChannels.DLQ_QUEUE_NAME;
    }

    ...

}

El router de error es recogido por cada una de las aplicaciones de flujo a través de una exploración paquete en el arranque de aplicaciones para cada primavera:

@ComponentScan(basePackages = { com.acme.error.router }
@SpringBootApplication
public class StreamApp {}

Cuando esto se implementa y ejecuta con el servidor de la nube de primavera de flujo de datos local (versión 1.5.0-RELEASE), y una DlqException es lanzada, el mensaje se enruta correctamente al método onError en el errorRouter y luego se coloca en el tema DLQ.

Sin embargo, cuando esta se implementa como un recipiente ventana acoplable con el servidor SCDF Kubernetes (también versión 1.5.0-RELEASE), el método onError nunca es golpeado. (La declaración de registro en el inicio del router no es de salida)

En los registros de inicio para las aplicaciones de flujo, parece que el grano es recogido correctamente y se registra como un detector para el errorChannel, pero por alguna razón, cuando se producen excepciones no consiguen manejado por el método onError en nuestro router.

Registros de inicio:

o.s.i.endpoint.EventDrivenConsumer : Adding {router:errorMessageMappingRouter.onError.router} as a subscriber to the 'errorChannel' channel
o.s.i.channel.PublishSubscribeChannel : Channel 'errorChannel' has 1 subscriber(s).
o.s.i.endpoint.EventDrivenConsumer : started errorMessageMappingRouter.onError.router

Estamos utilizando todos los valores predeterminados para las configuraciones de resorte secuencia nube y ligantes kafka:

spring.cloud:
  stream:
    binders:
      kafka:
        type: kafka
        environment.spring.cloud.stream.kafka.binder.brokers=brokerlist
        environment.spring.cloud.stream.kafka.binder.zkNodes=zklist

Edit: Añadido args vaina de kubectl describe <pod>

Args:
--spring.cloud.stream.bindings.input.group=delivery-stream
--spring.cloud.stream.bindings.output.producer.requiredGroups=delivery-stream
--spring.cloud.stream.bindings.output.destination=delivery-stream.enricher
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.zkNodes=<zkNodes>
--spring.cloud.stream.binders.xdkafka.type=kafka
--spring.cloud.stream.binders.xdkafka.defaultCandidate=true
--spring.cloud.stream.binders.xdkafka.environment.spring.cloud.stream.kafka.binder.brokers=<brokers>
--spring.cloud.stream.bindings.input.destination=delivery-stream.config-enricher

Otra idea que intentamos estaba tratando de utilizar la primavera Nube Stream - soporte del canal error de integración de primavera para enviar a un tema en el corredor de errores, pero ya que los mensajes no parece estar aterrizando en el mundial Integración primavera errorChannel en absoluto, que didn' t funciona bien.

¿Hay algo especial que tenemos que hacer en SCDF Kubernetes para permitir la integración global de la primavera errorChannel?

¿Que me estoy perdiendo aqui?

Actualizar con la solución de los comentarios:

Después de revisar su configuración ahora estoy bastante seguro de que sé cuál es el problema. Tiene un escenario configuración multi-aglutinante. Incluso si sólo se ocupa de una sola instancia aglutinante de la existencia de spring.cloud.stream.binders .... es lo que está pasando para que el marco tratarla como multi-aglutinante. Básicamente esto un error - github.com/spring-cloud/spring-cloud-stream/issues/1384. Como se puede ver que lo arreglaron pero hay que actualizar a Elmhurst.SR2 o tomar la última instantánea (estamos en RC2 y 2.1.0.RELEASE está en pocas semanas de todos modos) - Oleg Zhurakousky

Este fue de hecho el problema con nuestra configuración. En lugar de actualizar, simplemente eliminamos nuestro uso de múltiples ligante por el momento y se resolvió el problema.

Publicado el 27/11/2018 a las 16:49
fuente por usuario
En otros idiomas...                            


1 respuestas

votos
0

Actualizar con la solución de los comentarios:

Después de revisar su configuración ahora estoy bastante seguro de que sé cuál es el problema. Tiene un escenario configuración multi-aglutinante. Incluso si sólo se ocupa de una sola instancia aglutinante de la existencia de spring.cloud.stream.binders .... es lo que está pasando para que el marco tratarla como multi-aglutinante. Básicamente esto un error - github.com/spring-cloud/spring-cloud-stream/issues/1384. Como se puede ver que lo arreglaron pero hay que actualizar a Elmhurst.SR2 o tomar la última instantánea (estamos en RC2 y 2.1.0.RELEASE está en pocas semanas de todos modos) - Oleg Zhurakousky

Este fue de hecho el problema con nuestra configuración. En lugar de actualizar, simplemente eliminamos nuestro uso de múltiples ligante por el momento y se resolvió el problema.

Respondida el 05/12/2018 a las 22:05
fuente por usuario

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more