kafka rebalance

i have set up a program to consume from a kafka topic.

it has been running fine, till ~ 25% of the messages (out of 2/3mil rows) are consumed, there would be a lot of reblance started happening.

i have tried to increase the `

max.poll.interval.ms

and reduce the `

max.poll.records

and to asynchronously process the messages. have tried to set up the `

onPartitionsAssigned

and `

onPartitionsRevoked

`method as well. however, the rebalance is still keep happening, and the program starts stucking at 25%.

after a lot of investigation and trials, turns out that was due to the memory footprints of the program.

as more messages are put onto heap, several snap * 25% * 2/3 million rows, JVM starts busy running the GC thread, which eventually blocked the consumer to send back the heartbeat in time.

filter the message, or run this on a server would fix the issue.

mockito matchers

when mocking a jdbc query, the statement below was encountering an exception

doReturn("result").when(jdbc).queryForObject(anyString(), String.class);

turns out this was due to mockito, not allowing mix of matcher and raw argument

change above to

doReturn("result").when(jdbc).queryForObject(anyString(), eq(String.class));

worked.

Arrays.fill

the two code blocks would return different results

        List<Integer>[] rg = new List[n];
        Arrays.fill(rg, new ArrayList<>());
...
        List<List<Integer>> rg = new ArrayList<>();
        for (int i = 0; i < n; ++i) {
            rg.add(new ArrayList<>());
        }
...

turns out, with Arrays.fill, it first calculate the second parameter, then using that parameter to assign every array element

    public static void fill(Object[] a, Object val) {
        for (int i = 0, len = a.length; i < len; i++)
            a[i] = val;
    }

while the `rg.add(new ArrayList<>());` would always get a different object for each element.

trick to sort map by value

TreeMap could be used to sort the map by value

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> comparator.compare(map.get(o2), map.get(o1)));
        sorted.putAll(map);

however, there is a caveat there. if the values are the same, treemap would drop the entry (technically, update the value for existing key to the same value as the new key supposed to be inserted).

the trick is to override the equals condition

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> map.get(o2) - map.get(o1) <= 0 ? -1: 1);
        sorted.putAll(map);

trick to iterate stream with index

with an `AtomicInteger`, it could provide an index while traversing the stream of java collections.

        final int partition = 100;
        final AtomicInteger counter = new AtomicInteger();

        final ForkJoinTask<Integer> task = factorThreads.submit(() ->
                numbers.stream()
                        .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / partition))
                        .values()
                        .stream().map(integers -> {
                            log.info("check the list {}", integers);
                            return integers.size();
                }).reduce(0, Integer::sum)
        );
        if(task.join() != 1200)
            log.error("the number is not tally");
        else log.error("all 1200 numbers counted");

force parallel processing

there are times even the application is requesting for parallel processing, it could stick to one single thread without rotation.

for example, the case, https://lwpro2.dev/2021/06/11/jdk-spliterator-with-parallel-processing/.

this could be the case similar to thread affinity, https://en.wikipedia.org/wiki/Processor_affinity.

in java, to force the parallel processing, one way is to signal to the processor to rotate the time slicing.

stream.parallel()
                    .peek(i -> {
                        try {
                            Thread.sleep(0);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    })

as such, even though the time quantum could be larger than the first phase tasks duration. the thread could volunteer to round robin the pool, which then the process could proceed with following parallel processing, instead of leaving to original single thread to handle the second phase tasks.