Avanzando TestScheduler de RxJava para una llamada de bloqueo de retardo ()

votos
0

Usando RxJava2 tengo una clase que sirve de puente reactiva al mundo no reactivo. En esta clase, hay un método reemplazado, que devuelve algún valor después de un procesamiento potencialmente larga . Este procesamiento se incrusta al mundo reactiva usando Single.create(emitter -> ...).

El tiempo de procesamiento es crucial para mí, ya que hay una diferencia en cuanto a la lógica de negocio, ya sea un segundo abonado aparece mientras se está ejecutando el procesamiento o después de que se ha completado (o cancelada).

Ahora estoy en pruebas de esta clase - por lo tanto, necesito el largo tiempo de procesamiento a ser virtual única . RxJava de TestScheduleral rescate!

Para crear un banco de pruebas de implantación de dicho puente base de clase, que emula el retraso, necesito un bloqueo (!) Pedir a la TestScheduler, donde el avance en el tiempo se dispara en un hilo separado. (De lo contrario, el avance en el tiempo nunca se activará.)

Mi problema es que necesito el avance en el tiempo que se retrase al menos hasta que la 'computación caro' está bloqueando - de lo contrario el tiempo de cambiarme antes de la demora de operador se pone activo y, por lo tanto, se espera hasta que otro avance, que nunca va a pasar.

Puedo resolver esto llamando Thread.sleep(100)antes de la llamada para avanzar en el tiempo - pero que parece un poco feo para mí ... ¿Cuánto tiempo esperar? Para múltiples pruebas, que el tiempo se suma, pero no debido a problemas de tiempo ... uf.

¿Alguna idea de cómo poner a prueba esta situación de una manera más limpia con suerte? ¡Gracias!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println(emitting on  + Thread.currentThread() +  ...);

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println(onSubcribe: completable))
                       .blockingAwait();

            System.out.println(<< blocking released >>);

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess(FooBar!);

        });


        System.out.println(running the task ...);
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println(onSubcribe: single))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println(---SLEEPING---);
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println(---advancing time---);
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println(await...);
        testObserver.await();

        // assertions on task results, etc.
        System.out.println(finished. now do some testing... values (expected [FooBar!]):  + testObserver.values());

    }

}

[Nota: He hecho esta pregunta de un modo más complejo y detallado en el ayer - pero ya que era demasiado caliente en mi oficina y tenía varios defectos y errores. Por lo tanto he borrado y ahora es más condensado con el problema real.]

Publicado el 19/09/2018 a las 13:31
fuente por usuario
En otros idiomas...                            


1 respuestas

votos
0

Si entiendo su problema correctamente, usted tiene un servicio de larga duración y que desea ser capaz de probar las suscripciones a servicios que se verán afectados por su servicio de larga duración. Me he acercado a esto de la siguiente manera.

Definir su servicio como usted tiene, utilizando una TestSchedulerpara controlar su avance en el tiempo. A continuación, defina sus estímulos de prueba utilizando el mismo planificador de prueba.

Esto emula la entidad de bloqueo. No utilice "bloque".

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Debería ver que performTestAction1()ocurre, a continuación, performTestAction2()a continuación, los singleacabados, a continuación performTestAction3(). Todo eso es impulsado por el planificador de prueba.

Respondida el 20/09/2018 a las 15:00
fuente por usuario

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more