spring kafka seek

I have a kafka consumer which after resume, it’s picking up from last committed offset and sequentially consuming from there. Even with several calls to seekToEnd().

Logs

//before pause
2021-10-09 15:44:18,569 [fx-kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024115]   
2021-10-09 15:44:28,573 [fx-kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024116]   

//paused

//called seekToEnd before resume

//after resume
//called seekToEnd several times before and after resume
2021-10-09 15:45:13,603 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024117]   
2021-10-09 15:45:53,610 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024118]  
2021-10-09 15:46:03,612 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024119]   
2021-10-09 15:46:13,613 [kafka-consumer-0-C-1] INFO  c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024120] 

Turns out this actually due to an issue with the implementation of the subclass extend AbstractConsumerSeekAware.

Turned out there is an issue with my current code. At consumer start up, I did a manual seek in the onPartitionsAssigned. This so far have been working well.

However, now when there is a need to pause/resume the consumer, turned out the overrode method to do the manual seek has since skipped maintaining the Consumer into the callbacks field.

When the seekToEnd was invoked, it’s really iterating through an empty map.

sql server parameter sniffing

i was using PreparedStatement to load some data out of sql database, which has been working fine for quite some time. Till recently, for one day’s data, the query is not able to finish (hang on reading the response from database)

      "stackTrace": [
        {
          "methodName": "socketRead0",
          "fileName": "SocketInputStream.java",
          "lineNumber": -2,
          "className": "java.net.SocketInputStream",
          "nativeMethod": true
        },
        {
          "methodName": "socketRead",
          "fileName": "SocketInputStream.java",
          "lineNumber": 116,
          "className": "java.net.SocketInputStream",
          "nativeMethod": false
        },
        {
          "methodName": "read",
          "fileName": "SocketInputStream.java",
          "lineNumber": 171,
          "className": "java.net.SocketInputStream",
          "nativeMethod": false
        },
        {
          "methodName": "read",
          "fileName": "SocketInputStream.java",
          "lineNumber": 141,
          "className": "java.net.SocketInputStream",
          "nativeMethod": false
        },
        {
          "methodName": "read",
          "fileName": "IOBuffer.java",
          "lineNumber": 2058,
          "className": "com.microsoft.sqlserver.jdbc.TDSChannel",
          "nativeMethod": false
        },
        {
          "methodName": "readPacket",
          "fileName": "IOBuffer.java",
          "lineNumber": 6617,
          "className": "com.microsoft.sqlserver.jdbc.TDSReader",
          "nativeMethod": false
        },
        {
          "methodName": "nextPacket",
          "fileName": "IOBuffer.java",
          "lineNumber": 6567,
          "className": "com.microsoft.sqlserver.jdbc.TDSReader",
          "nativeMethod": false
        },
        {
          "methodName": "ensurePayload",
          "fileName": "IOBuffer.java",
          "lineNumber": 6540,
          "className": "com.microsoft.sqlserver.jdbc.TDSReader",
          "nativeMethod": false
        },
        {
          "methodName": "skip",
          "fileName": "IOBuffer.java",
          "lineNumber": 7200,
          "className": "com.microsoft.sqlserver.jdbc.TDSReader",
          "nativeMethod": false
        },
        {
          "methodName": "skipValue",
          "fileName": "dtv.java",
          "lineNumber": 3362,
          "className": "com.microsoft.sqlserver.jdbc.ServerDTVImpl",
          "nativeMethod": false
        },
        {
          "methodName": "skipValue",
          "fileName": "dtv.java",
          "lineNumber": 162,
          "className": "com.microsoft.sqlserver.jdbc.DTV",
          "nativeMethod": false
        },
        {
          "methodName": "skipValue",
          "fileName": "Column.java",
          "lineNumber": 152,
          "className": "com.microsoft.sqlserver.jdbc.Column",
          "nativeMethod": false
        },
        {
          "methodName": "skipColumns",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 216,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },
        {
          "methodName": "loadColumn",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 770,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },
        {
          "methodName": "getterGetColumn",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 2036,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },
        {
          "methodName": "getValue",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 2054,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },
        {
          "methodName": "getValue",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 2040,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },
        {
          "methodName": "getString",
          "fileName": "SQLServerResultSet.java",
          "lineNumber": 2525,
          "className": "com.microsoft.sqlserver.jdbc.SQLServerResultSet",
          "nativeMethod": false
        },

the same query however works well with plain run in the sql tool, or if run using plain java statement.

and until I have added a dummy where clause “… and 1=1“, then suddenly the preparedstatement is able to return the result timely again.

in the beginning, i thought it was the dummy clause which made a difference strangely. turns out, this was a problem with sql server parameter sniffing.

the dummy where clause worked only because sql server now see it as a different query, hence not using the previous cached execution plan.

this can be reproduced by adding `

option (recompile)

to the query. this will trigger sql server to drop the previous cached execution plan, as such, even the original query, without the dummy where clause is now back to performing again.

SELECT * FROM sys.database_scoped_configurations;

https://www.databasejournal.com/features/mssql/turning-off-parameter-sniffing-for-a-sql-server-database-by-default.html

==========================

looking further into this, the execution was stuck at using previous executing plan.

seems like because the first plan, in blue, is not working out. finally it created a new plan in

green.

and the wait for the blue plan was for HTDELETE. this is a change made in SQL server since 2014.

SQL Server 2014 now uses one shared hash table instead of per-thread copy.
This provides the benefit of significantly lowering the amount of memory
required to persist the hash table but, as you can imagine, the multiple
threads depending on that single copy of the hash table must synchronize with
each other before, for example, deallocating the hash table. To do so, those
threads wait on the HTDELETE (Hash Table DELETE) wait type.<o:p></o:p>

https://social.msdn.microsoft.com/Forums/en-US/8579f864-cbdc-49b9-be26-d47af61df56d/sql-server-2014-wait-info-htdelete?forum=sql14db

likely this is a bug in SQL server, which was hit when I am running the program using parallelStream(), which steam() would work.*

*There are multiple process hitting the same query.

Provides the benefit of significantly lowering the amount of memory required to persist the hash table but, as you can imagine, the multiple threads depending on that single copy of the hash table must synchronize with each other before, for example, deallocating the hash table. To do so, those threads wait on the HTDELETE (Hash Table DELETE) wait type.

TimSort

when I am writing a comparator using lamba, java throws out this exception

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:777)
        at java.util.TimSort.mergeAt(TimSort.java:514)
        at java.util.TimSort.mergeCollapse(TimSort.java:441)
        at java.util.TimSort.sort(TimSort.java:245)
        at java.util.Arrays.sort(Arrays.java:1512)
        at java.util.ArrayList.sort(ArrayList.java:1464)
        at java.util.stream.SortedOps$RefSortingSink.end(SortedOps.java:392)
        at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
        at java.util.stream.Sink$ChainedReference.end(Sink.java:258)
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:500)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)

seems like `TimSort` is having a safety check to ensure the Comparator contract, however, it’s not always working.

So this only throws out an issue when running enough times

    @Test
    public void test3() {
        List<Domain> domains = new ArrayList<>();
        for (int i = 0; i < 1_000; i++) {
//            domains.clear();
            domains.add(Domain.builder().val(0.1).name("1").build());
            domains.add(Domain.builder().val(0.5).name("2").build());
            domains.add(Domain.builder().val(1.2).name("3").build());
            domains.add(Domain.builder().val(2.1).name("4").build());
            Collections.shuffle(domains);
            log.info("{}th start sorting {}", i, domains);
            log.info("check sorted {}", domains.parallelStream().sorted((o1, o2) -> {
                        final int diff = (int) (o2.val - o1.val);
                        log.info("check {} - {} = {}", o1, o2, diff);
                        return diff;
                    }).collect(Collectors.toList())
            );
        }

    }
    @Test
    public void test4() {
        List<Domain> domains = new ArrayList<>();
        for (int i = 0; i < 1_000_000; i++) {
//            domains.clear();
            domains.add(Domain.builder().val(0.1).name("1").build());
            domains.add(Domain.builder().val(0.5).name("2").build());
            domains.add(Domain.builder().val(1.2).name("3").build());
            domains.add(Domain.builder().val(2.1).name("4").build());
            Collections.shuffle(domains);
//            log.info("{}th start sorting {}", i, domains);
            log.info("check sorted {}", domains.parallelStream().sorted((o1, o2) -> Double.compare(o2.val, o1.val)).collect(Collectors.toList()).size()
            );
        }

    }
}

@Data
@Builder
class Domain {
    double val;
    String name;
}

kafka rebalance

i have set up a program to consume from a kafka topic.

it has been running fine, till ~ 25% of the messages (out of 2/3mil rows) are consumed, there would be a lot of reblance started happening.

i have tried to increase the `

max.poll.interval.ms

and reduce the `

max.poll.records

and to asynchronously process the messages. have tried to set up the `

onPartitionsAssigned

and `

onPartitionsRevoked

`method as well. however, the rebalance is still keep happening, and the program starts stucking at 25%.

after a lot of investigation and trials, turns out that was due to the memory footprints of the program.

as more messages are put onto heap, several snap * 25% * 2/3 million rows, JVM starts busy running the GC thread, which eventually blocked the consumer to send back the heartbeat in time.

filter the message, or run this on a server would fix the issue.

mockito matchers

when mocking a jdbc query, the statement below was encountering an exception

doReturn("result").when(jdbc).queryForObject(anyString(), String.class);

turns out this was due to mockito, not allowing mix of matcher and raw argument

change above to

doReturn("result").when(jdbc).queryForObject(anyString(), eq(String.class));

worked.

Arrays.fill

the two code blocks would return different results

        List<Integer>[] rg = new List[n];
        Arrays.fill(rg, new ArrayList<>());
...
        List<List<Integer>> rg = new ArrayList<>();
        for (int i = 0; i < n; ++i) {
            rg.add(new ArrayList<>());
        }
...

turns out, with Arrays.fill, it first calculate the second parameter, then using that parameter to assign every array element

    public static void fill(Object[] a, Object val) {
        for (int i = 0, len = a.length; i < len; i++)
            a[i] = val;
    }

while the `rg.add(new ArrayList<>());` would always get a different object for each element.

trick to sort map by value

TreeMap could be used to sort the map by value

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> comparator.compare(map.get(o2), map.get(o1)));
        sorted.putAll(map);

however, there is a caveat there. if the values are the same, treemap would drop the entry (technically, update the value for existing key to the same value as the new key supposed to be inserted).

the trick is to override the equals condition

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> map.get(o2) - map.get(o1) <= 0 ? -1: 1);
        sorted.putAll(map);