TimSort

when I am writing a comparator using lamba, java throws out this exception

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:777)
        at java.util.TimSort.mergeAt(TimSort.java:514)
        at java.util.TimSort.mergeCollapse(TimSort.java:441)
        at java.util.TimSort.sort(TimSort.java:245)
        at java.util.Arrays.sort(Arrays.java:1512)
        at java.util.ArrayList.sort(ArrayList.java:1464)
        at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:392)
        at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
        at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)

seems like `TimSort` is having a safety check to ensure the Comparator contract, however, it’s not always working.

So this only throws out an issue when running enough times

    @Test
    public void test3() {
        List<Domain> domains = new ArrayList<>();
        for (int i = 0; i < 1_000; i++) {
//            domains.clear();
            domains.add(Domain.builder().val(0.1).name("1").build());
            domains.add(Domain.builder().val(0.5).name("2").build());
            domains.add(Domain.builder().val(1.2).name("3").build());
            domains.add(Domain.builder().val(2.1).name("4").build());
            Collections.shuffle(domains);
            log.info("{}th start sorting {}", i, domains);
            log.info("check sorted {}", domains.parallelStream().sorted((o1, o2) -> {
                        final int diff = (int) (o2.val - o1.val);
                        log.info("check {} - {} = {}", o1, o2, diff);
                        return diff;
                    }).collect(Collectors.toList())
            );
        }

    }
    @Test
    public void test4() {
        List<Domain> domains = new ArrayList<>();
        for (int i = 0; i < 1_000_000; i++) {
//            domains.clear();
            domains.add(Domain.builder().val(0.1).name("1").build());
            domains.add(Domain.builder().val(0.5).name("2").build());
            domains.add(Domain.builder().val(1.2).name("3").build());
            domains.add(Domain.builder().val(2.1).name("4").build());
            Collections.shuffle(domains);
//            log.info("{}th start sorting {}", i, domains);
            log.info("check sorted {}", domains.parallelStream().sorted((o1, o2) -> Double.compare(o2.val, o1.val)).collect(Collectors.toList()).size()
            );
        }

    }
}

@Data
@Builder
class Domain {
    double val;
    String name;
}

kafka rebalance

i have set up a program to consume from a kafka topic.

it has been running fine, till ~ 25% of the messages (out of 2/3mil rows) are consumed, there would be a lot of reblance started happening.

i have tried to increase the `

max.poll.interval.ms

and reduce the `

max.poll.records

and to asynchronously process the messages. have tried to set up the `

onPartitionsAssigned

and `

onPartitionsRevoked

`method as well. however, the rebalance is still keep happening, and the program starts stucking at 25%.

after a lot of investigation and trials, turns out that was due to the memory footprints of the program.

as more messages are put onto heap, several snap * 25% * 2/3 million rows, JVM starts busy running the GC thread, which eventually blocked the consumer to send back the heartbeat in time.

filter the message, or run this on a server would fix the issue.

mockito matchers

when mocking a jdbc query, the statement below was encountering an exception

doReturn("result").when(jdbc).queryForObject(anyString(), String.class);

turns out this was due to mockito, not allowing mix of matcher and raw argument

change above to

doReturn("result").when(jdbc).queryForObject(anyString(), eq(String.class));

worked.

Arrays.fill

the two code blocks would return different results

        List<Integer>[] rg = new List[n];
        Arrays.fill(rg, new ArrayList<>());
...
        List<List<Integer>> rg = new ArrayList<>();
        for (int i = 0; i < n; ++i) {
            rg.add(new ArrayList<>());
        }
...

turns out, with Arrays.fill, it first calculate the second parameter, then using that parameter to assign every array element

    public static void fill(Object[] a, Object val) {
        for (int i = 0, len = a.length; i < len; i++)
            a[i] = val;
    }

while the `rg.add(new ArrayList<>());` would always get a different object for each element.

trick to sort map by value

TreeMap could be used to sort the map by value

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> comparator.compare(map.get(o2), map.get(o1)));
        sorted.putAll(map);

however, there is a caveat there. if the values are the same, treemap would drop the entry (technically, update the value for existing key to the same value as the new key supposed to be inserted).

the trick is to override the equals condition

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> map.get(o2) - map.get(o1) <= 0 ? -1: 1);
        sorted.putAll(map);

trick to iterate stream with index

with an `AtomicInteger`, it could provide an index while traversing the stream of java collections.

        final int partition = 100;
        final AtomicInteger counter = new AtomicInteger();

        final ForkJoinTask<Integer> task = factorThreads.submit(() ->
                numbers.stream()
                        .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / partition))
                        .values()
                        .stream().map(integers -> {
                            log.info("check the list {}", integers);
                            return integers.size();
                }).reduce(0, Integer::sum)
        );
        if(task.join() != 1200)
            log.error("the number is not tally");
        else log.error("all 1200 numbers counted");