trick to sort map by value

TreeMap could be used to sort the map by value

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> comparator.compare(map.get(o2), map.get(o1)));
        sorted.putAll(map);

however, there is a caveat there. if the values are the same, treemap would drop the entry (technically, update the value for existing key to the same value as the new key supposed to be inserted).

the trick is to override the equals condition

        Map<Character, Integer> sorted = new TreeMap<>((o1, o2) -> map.get(o2) - map.get(o1) <= 0 ? -1: 1);
        sorted.putAll(map);

trick to iterate stream with index

with an `AtomicInteger`, it could provide an index while traversing the stream of java collections.

        final int partition = 100;
        final AtomicInteger counter = new AtomicInteger();

        final ForkJoinTask<Integer> task = factorThreads.submit(() ->
                numbers.stream()
                        .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / partition))
                        .values()
                        .stream().map(integers -> {
                            log.info("check the list {}", integers);
                            return integers.size();
                }).reduce(0, Integer::sum)
        );
        if(task.join() != 1200)
            log.error("the number is not tally");
        else log.error("all 1200 numbers counted");

force parallel processing

there are times even the application is requesting for parallel processing, it could stick to one single thread without rotation.

for example, the case, https://lwpro2.dev/2021/06/11/jdk-spliterator-with-parallel-processing/.

this could be the case similar to thread affinity, https://en.wikipedia.org/wiki/Processor_affinity.

in java, to force the parallel processing, one way is to signal to the processor to rotate the time slicing.

stream.parallel()
                    .peek(i -> {
                        try {
                            Thread.sleep(0);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    })

as such, even though the time quantum could be larger than the first phase tasks duration. the thread could volunteer to round robin the pool, which then the process could proceed with following parallel processing, instead of leaving to original single thread to handle the second phase tasks.

JDK Spliterator with Parallel Processing

I have created a fixed size Spliterator to split the collection into fixed size.

https://github.com/openjdk/jdk/pull/2907

It's working as expected. With a collection of size greater than the threshold, it would trySplit and generate a new Spliterator.

And if the stream is parallel, it would pass the spliterator to different threads.

however, when trying to group the elements by thread, below code is not always working

        Map<String, List<Integer>> partition = new ForkJoinPool(10).submit( ()  ->StreamSupport.stream(new FixedSizeSpliterator<>(numbers.toArray(new Integer[0]), 5), true)
                .collect(Collectors.groupingBy(i->  Thread.currentThread().getName(),Collectors.toList()))).join();

        partition.entrySet().stream().forEach(en ->log.warn("check key {} vs value {}", en.getKey(), en.getValue() ));

most of the time, it would equally divided the elements into 10 threads. however, there are once in a while, it would put everything into a single thread.

after a lot of investigation, it turns out this could be due to the fact one thread is fast enough to handle the whole processing, which is just doing a group by operation here.

so the solution is to just signal the pool to invoke another thread.

        Map<String, List<Integer>> partition2 = new ForkJoinPool(10).submit( ()  ->StreamSupport.stream(new FixedSizeSpliterator<>(numbers.toArray(new Integer[0]), 5), true)
                .parallel()
                .map(i -> {
                    try {
                        Thread.sleep(0);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i;
                })
                .collect(Collectors.groupingBy(i->  Thread.currentThread().getName(),Collectors.toList()))).join();
        partition2.entrySet().stream().forEach(en ->log.warn("check key2 {} vs value {}", en.getKey(), en.getValue() ));

output

17:56:03.074 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 6000 till 12000
17:56:03.079 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 9000 till 12000
17:56:03.080 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 7501 till 9001
17:56:03.080 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 7502 to 8251 till 9001
17:56:03.083 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 6751 till 7502
17:56:03.084 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 10500 till 12000
17:56:03.084 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 3000 till 6001
17:56:03.084 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 10501 to 11250 till 12000
17:56:03.085 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 4501 till 6001
17:56:03.085 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 3751 till 4502
17:56:03.085 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 9751 till 10501
17:56:03.087 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 1500 till 3001
17:56:03.087 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 1501 to 2251 till 3001
17:56:03.088 [ForkJoinPool-2-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 750 till 1501
17:56:03.087 [ForkJoinPool-2-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 4502 to 5251 till 6001
17:56:03.089 [main] WARN com.zg.d.TestSplitter - check key ForkJoinPool-2-worker-2 vs value [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100,
17:56:03.091 [main] WARN com.zg.d.TestSplitter - check key ForkJoinPool-2-worker-9 vs value [751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829,
17:56:03.117 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 6000 till 12000
17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 9000 till 12000
17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 7501 till 9001
17:56:03.118 [ForkJoinPool-3-worker-9] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 7502 to 8251 till 9001
17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 3000 till 6001
17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 4501 till 6001
17:56:03.120 [ForkJoinPool-3-worker-2] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 3001 to 3751 till 4502
17:56:03.120 [ForkJoinPool-3-worker-11] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 10500 till 12000
17:56:03.121 [ForkJoinPool-3-worker-11] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 10501 to 11250 till 12000
17:56:03.121 [ForkJoinPool-3-worker-4] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 1500 till 3001
17:56:03.122 [ForkJoinPool-3-worker-4] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 1501 to 2251 till 3001
17:56:03.123 [ForkJoinPool-3-worker-6] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 9001 to 9751 till 10501
17:56:03.123 [ForkJoinPool-3-worker-15] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 6001 to 6751 till 7502
17:56:03.124 [ForkJoinPool-3-worker-13] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 4502 to 5251 till 6001
17:56:03.124 [ForkJoinPool-3-worker-8] INFO com.zg.d.u.util.FixedSizeSpliterator - split from 0 to 750 till 1501
17:56:03.160 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-15 vs value [6752, 6753, 6754, 6755, 6756, 6757, 6758, 6759, 6760, 6761, 6762, 6763, 6764, 6765, 6766, 6767, 6768, 6769, 6770, 6771, 6772, 6773, 6774, 6775, 6776, 6777, 6778, 6779, 6780, 6781, 6782, 6783, 6784, 6785, 6786, 6787, 6788, 6789, 6790, 6791, 6792, 6793, 6794, 6795, 6796, 6797, 6798, 6799, 6800, 6801, 6802, 6803, 6804, 6805, 6806, 6807, 6808, 6809, 6810, 6811, 6812, 6813, 6814, 6815, 6816,
17:56:03.161 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-9 vs value [7502, 7503, 7504, 7505, 7506, 7507, 7508, 7509, 7510, 7511, 7512, 7513, 7514, 7515, 7516, 7517, 7518, 7519, 7520, 7521, 7522, 7523, 7524, 7525, 7526, 7527, 7528, 7529, 7530, 7531, 7532, 7533, 7534, 7535, 7536, 7537, 7538, 7539, 7540, 7541, 7542, 7543, 7544, 7545, 7546, 7547, 7548, 7549, 7550, 7551, 7552, 7553, 7554, 7555, 7556, 7557, 7558, 7559, 7560, 7561, 7562, 7563, 7564, 7565, 7566, 
17:56:03.162 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-11 vs value [10501, 10502, 10503, 10504, 10505, 10506, 10507, 10508, 10509, 10510, 10511, 10512, 10513, 10514, 10515, 10516, 10517, 10518, 10519, 10520, 10521, 10522, 10523, 10524, 10525, 10526, 10527, 10528, 10529, 10530, 10531, 10532, 10533, 10534, 10535, 10536, 10537, 10538, 10539, 10540, 10541, 10542, 10543, 10544, 10545, 10546, 10547, 10548, 10549, 10550, 10551, 10552, 10553, 10554, 10555, 10556,
17:56:03.163 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-8 vs value [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 
17:56:03.163 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-13 vs value [5252, 5253, 5254, 5255, 5256, 5257, 5258, 5259, 5260, 5261, 5262, 5263, 5264, 5265, 5266, 5267, 5268, 5269, 5270, 5271, 5272, 5273, 5274, 5275, 5276, 5277, 5278, 5279, 5280, 5281, 5282, 5283, 5284, 5285, 5286, 5287, 5288, 5289, 5290, 5291, 5292, 5293, 5294, 5295, 5296, 5297, 5298, 5299, 5300, 5301, 5302, 5303, 5304, 5305, 5306, 5307, 5308, 5309, 5310, 5311, 5312, 5313, 5314, 5315, 5316, 5317
17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-4 vs value [1501, 1502, 1503, 1504, 1505, 1506, 1507, 1508, 1509, 1510, 1511, 1512, 1513, 1514, 1515, 1516, 1517, 1518, 1519, 1520, 1521, 1522, 1523, 1524, 1525, 1526, 1527, 1528, 1529, 1530, 1531, 1532, 1533, 1534, 1535, 1536, 1537, 1538, 1539, 1540, 1541, 1542, 1543, 1544, 1545, 1546, 1547, 1548, 1549, 1550, 1551, 1552, 1553, 1554, 1555, 1556, 1557, 1558, 1559, 1560, 1561, 1562, 1563, 1564, 1565, 1566,
17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-6 vs value [9752, 9753, 9754, 9755, 9756, 9757, 9758, 9759, 9760, 9761, 9762, 9763, 9764, 9765, 9766, 9767, 9768, 9769, 9770, 9771, 9772, 9773, 9774, 9775, 9776, 9777, 9778, 9779, 9780, 9781, 9782, 9783, 9784, 9785, 9786, 9787, 9788, 9789, 9790, 9791, 9792, 9793, 9794, 9795, 9796, 9797, 9798, 9799, 9800, 9801, 9802, 9803, 9804, 9805, 9806, 9807, 9808, 9809, 9810, 9811, 9812, 9813, 9814, 9815, 9816, 9817,
17:56:03.164 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-10 vs value [4502, 4503, 4504, 4505, 4506, 4507, 4508, 4509, 4510, 4511, 4512, 4513, 4514, 4515, 4516, 4517, 4518, 4519, 4520, 4521, 4522, 4523, 4524, 4525, 4526, 4527, 4528, 4529, 4530, 4531, 4532, 4533, 4534, 4535, 4536, 4537, 4538, 4539, 4540, 4541, 4542, 4543, 4544, 4545, 4546, 4547, 4548, 4549, 4550, 4551, 4552, 4553, 4554, 4555, 4556, 4557, 4558, 4559, 4560, 4561, 4562, 4563, 4564, 4565, 4566, 4567
17:56:03.165 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-1 vs value [3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 3010, 3011, 3012, 3013, 3014, 3015, 3016, 3017, 3018, 3019, 3020, 3021, 3022, 3023, 3024, 3025, 3026, 3027, 3028, 3029, 3030, 3031, 3032, 3033, 3034, 3035, 3036, 3037, 3038, 3039, 3040, 3041, 3042, 3043, 3044, 3045, 3046, 3047, 3048, 3049, 3050, 3051, 3052, 3053, 3054, 3055, 3056, 3057, 3058, 3059, 3060, 3061, 3062, 3063, 3064, 3065, 3066,
17:56:03.165 [main] WARN com.zg.d.TestSplitter - check key2 ForkJoinPool-3-worker-2 vs value [2252, 2253, 2254, 2255, 2256, 2257, 2258, 2259, 2260, 2261, 2262, 2263, 2264, 2265, 2266, 2267, 2268, 2269, 2270, 2271, 2272, 2273, 2274, 2275, 2276, 2277, 2278, 2279, 2280, 2281, 2282, 2283, 2284, 2285, 2286, 2287, 2288, 2289, 2290, 2291, 2292, 2293, 2294, 2295, 2296, 2297, 2298, 2299, 2300, 2301, 2302, 2303, 2304, 2305, 2306, 2307, 2308, 2309, 2310, 2311, 2312, 2313, 2314, 2315, 2316, 2317,



spring retry

similar to circuit breaker, spring retry is a mechanism to handle for intermittent service unavailability and fallback if that has been sustained.

my code has been working well

    @Recover
    public boolean altXX(String date){
        ......
        return false;
    }

    @Retryable(value = DataXXXException .class, maxAttempts = 10, backoff = @Backoff(delay = 30_000))
    public boolean xxCheck(String date) throws DataXXXException {
             //biz logic here
            log.error("capturing the error", e);
            throw new DataXXXException ("XX Data not yet ready");
        }
        return true;
    }

it would log the error several times before either it return a true when the data is available or return false after 10 tries.

till a state it starts to do xxCheck, then went straight to return false, without logging any error.

it has really confused me.

i then moved on to update the recover method to log the DataXXXException,

   @Recover
    public boolean altXX(DataXXXExceptione, String date){
        ...
        return false;
    }

which then instead of returning the false, it now stop the thread and straight away throw a NPE.

2021-06-05 14:15:21,946 [main] ERROR o.s.b.SpringApplication - Application run failed java.lang.NullPointerException: null
...
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:746)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:93)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	... 35 common frames omitted
Wrapped by: org.springframework.retry.ExhaustedRetryException: Cannot locate recovery method; nested exception is java.lang.NullPointerException
	at org.springframework.retry.annotation.RecoverAnnotationRecoveryHandler.recover(RecoverAnnotationRecoveryHandler.java:70)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:142)
	at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
	at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:116)
	at org.springframework.retry.annotation.AnnotationAwareRetryOperationsInterceptor.invoke(AnnotationAwareRetryOperationsInterceptor.java:163)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
...
 org.springframework.retry.ExhaustedRetryException: Cannot locate recovery method; nested exception is java.lang.NullPointerException
....
Caused by: java.lang.NullPointerException: null

at the same time, there is another retryable, which starts throwing

Wrapped by: org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted

and

ERROR while querying for country ccy java.sql.SQLException: HikariDataSource HikariDataSource (null) has been closed.

after some investigation, turns out that’s due to a code in the //biz logic here block, which throws a RunTimeException (NPE), which was not captured and straightaway to recover.

and the thread calling is the `main` thread, which tries to stop the application.

as the retryable thread has received the signal to stop, same the hikariDatasource, which throws the interuppted exception

exception in threads

when the forked out thread is running the task, if any exception happens, depends on the mechanism how the thread/threadpool is constructed, it could handle the exception differently.

for forkjoinpool, if the task is executed. when the task failed with exception, it will propagate the exception, till it’s either handled or if uncaught then captured by the `UncaughtExceptionHandler`. (this is called by JVM).

when the task is submitted though, the task is wrapped into AdaptedRunnableAction, which doesn’t propagate the exception.

I think what JCP/JVM trying to do is, trust the developers know the difference between the call and handle the exception from the returned task themselves.

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

which in turn calls the propagateException

    private int setExceptionalCompletion(Throwable ex) {
        int s = recordExceptionalCompletion(ex);
        if ((s & DONE_MASK) == EXCEPTIONAL)
            internalPropagateException(ex);
        return s;
    }

java.util.concurrent.RejectedExecutionException

The app was encountering this exception while submitting new task to the pool. There was not much details for the error message.

Application run failed java.util.concurrent.RejectedExecutionException: null
at java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:2339)
at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2430)
at java.util.concurrent.ForkJoinPool.invokeAll(ForkJoinPool.java:2733)

Turns out the issue was due to the case that pool is a common pool which has been shutdown (not accepting new connections) in some other part of the application.