Page History
...
Code Block | ||
---|---|---|
| ||
@ExtendWith(SpringExtension::class) @SpringBootTest class HelloKStreamTest { private lateinit var producer: KafkaProducer<String, HelloKTableState> private lateinit var kstreams: HelloKStreamsResult @Autowired lateinit var redisService: RedisService @BeforeTest fun setUp() { producer = createKafkaProducer() kstreams = createHelloKStreams(redisService) val latch = CountDownLatch(1) kstreams.streams.setStateListener { newState, _ -> if (newState == KafkaStreams.State.RUNNING) { latch.countDown() } } kstreams.streams.start() latch.await() // Wait for the state store to be ready val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries( kstreams.streams, "hello-state-store" ) } @AfterTest fun tearDown() { if (::producer.isInitialized) { producer.close(Duration.ofSeconds(3)) } kstreams.streams.close() } @Test fun testHelloCommand() { var testPersistId = "testid-01" for(i in 1L..10L) { val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10) producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState)) } // Wait for the state store to be ready Thread.sleep(5000) } } |