protobuf operation

quite impressed by protobuf performance, that the built in implementation is even better than streaming earlier:

models.toBuilder().clear().mergeFrom(
                Files.list(..).filter(Files::isRegularFile)
                        .map(Path::toFile)
                        .filter(file -> ...)
                        .collect(Collectors.toList())
                        .parallelStream()
                        .map(file -> ..)
//read into protobuf
                        .reduce((m1, m2) -> m1.toBuilder().mergeFrom(m2).build())
                        .orElseGet(()-> Stress.StressModels.getDefaultInstance())
            ).build();

models.getModelsList().parallelStream().forEach(...)

is even better performing than

models = Files.list(Paths.get(..)).filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .filter(file -> ..)
                    .collect(Collectors.toList())
                    .parallelStream()
                    .map(file -> ..)
                    .flatMap(m -> m.getModelsList().stream());

models.parallel().forEach(...)

Files list parallel stream

I was really confused by the output from the files list stream

16:25:29.032 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.031 [main] INFO StressTest - compute from parallel file with collection
16:25:29.032 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.038 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel file with collection
16:25:29.032 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel file with collection
16:25:29.039 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel file with collection
16:25:29.037 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.037 [main] INFO StressTest - compute from parallel file with collection
16:25:29.039 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel file with collection
16:25:29.039 [main] INFO StressTest - parallel compute for file names with collect
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.042 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-3] INFO StressTest - compute from parallel string with collection
16:25:29.042 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-1] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [ForkJoinPool.commonPool-worker-2] INFO StressTest - compute from parallel string with collection
16:25:29.043 [main] INFO StressTest - parallel compute for file names
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.045 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.048 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.049 [main] INFO StressTest - compute from parallel string
16:25:29.050 [main] INFO StressTest - compute from parallel string
16:25:29.050 [main] INFO StressTest - compute from parallel string
16:25:29.050 [main] INFO StressTest - compute from parallel string
16:25:29.050 [main] INFO StressTest - parallel compute for files
16:25:29.051 [main] INFO StressTest - compute from parallel file
16:25:29.051 [main] INFO StressTest - compute from parallel file
16:25:29.051 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.052 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.053 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file
16:25:29.054 [main] INFO StressTest - compute from parallel file

Process finished with exit code 0

corresponding to code

        try {
            Files.list(Paths.get("...."))
                    .parallel()
                    .filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .filter(file -> file.getName().startsWith(Constants.AB_MODEL))
                    .collect(Collectors.toList())
                    .parallelStream()
                    .forEach(s -> {
                        log.info("compute from parallel file with collection");
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }

        log.info("parallel compute for file names with collect");
        try {
            Files.list(Paths.get("...."))
                    .parallel()
                    .filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .filter(file -> file.getName().startsWith(Constants.AB_MODEL))
                    .map(file -> file.getName())
                    .collect(Collectors.toList())
                    .parallelStream()
                    .forEach(s -> {
                        log.info("compute from parallel string with collection");
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }

        log.info("parallel compute for file names");
        try {
            Files.list(Paths.get("...."))
                    .parallel()
                    .filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .filter(file -> file.getName().startsWith(Constants.AB_MODEL))
                    .map(file -> file.getName())
                    .forEach(s -> {
                        log.info("compute from parallel string");
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }

        log.info("parallel compute for files");
        try {
            Files.list(Paths.get("...."))
                    .parallel()
                    .filter(Files::isRegularFile)
                    .map(Path::toFile)
                    .filter(file -> file.getName().startsWith(Constants.AB_MODEL))
                    .forEach(s -> {
                        log.info("compute from parallel file");
                    });
        } catch (IOException e) {
            e.printStackTrace();
        }

so the parallel from Files.list is resulting in a single thread to process all files (~50 files).

Unless there is a collect to do a parallel stream again, then it will split into the common pool.

After a lot of investigate and research, turns out JCP has a really not crafted implementations on parallel:

source: http://mail.openjdk.java.net/pipermail/core-libs-dev/2015-July/034539.html

so basically when it’s doing splitIterator, it was using Long.MAX_VALUE:

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

source: https://stackoverflow.com/questions/34341656/why-is-files-list-parallel-stream-performing-so-much-slower-than-using-collect

code: https://github.com/openjdk/jdk/blob/d234388042e00f6933b4d723614ef42ec41e0c25/src/java.base/share/classes/java/util/Spliterators.java#L1784

            do { a[j] = i.next(); } while (++j < n && i.hasNext());

spring boot datasource configuration

Have spent quite sometime to auto apply the datasource configuration from yml, turns out it could be achieved through the

@ConfigurationProperties

annotations.

So with a configuration in yaml,

spring:
  profiles: dev
  datasource:
    hikari:
      auto-commit: true
      connection-timeout: 30000
      maximum-pool-size: 20
    url: jdbc:sqlserver://..
    username: 
    password: 

and a bean configuration

    @Bean(name = "RODataSource")
    @ConfigurationProperties("spring.datasource.hikari")
    public DataSource getDataSource(){
        HikariDataSource dataSource = DataSourceBuilder.create()
                .type(HikariDataSource.class)
                .url(url)
                .username(username).password(pwd)
                .driverClassName(driver)
                .build();
        return dataSource;
    }

The configurationProperties is able to reflect what’s the Bean and apply the corresponding properties.

(auto-commit, pool size and timeout value for example)

transport failed with grpc

while gRPC server serving stream of response, it they are responding concurrently, looks likes it will then will fall into

INFO: Transport failed
java.lang.IllegalStateException: Stream 3 sent too many headers EOS: false
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.validateHeadersSentState(DefaultHttp2ConnectionEncoder.java:157)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders0(DefaultHttp2ConnectionEncoder.java:230)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeHeaders(DefaultHttp2ConnectionEncoder.java:150)
	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DecoratingHttp2FrameWriter.writeHeaders(DecoratingHttp2FrameWriter.java:45)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.sendResponseHeaders(NettyServerHandler.java:707)
	at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.write(NettyServerHandler.java:626)

The solution is to stream in serial instead:

..
//.parallel()  //disable the parrallel stream
.mapToObj(value -> ...newBuilder().setMessage(value)....build())
.forEach(reply -> responseObserver.onNext(reply));

java linkage error

https://docs.jboss.org/jbossas/docs/Server_Configuration_Guide/4/html/Class_Loading_and_Types_in_Java-LinkageErrors___Making_Sure_You_Are_Who_You_Say_You_Are.html

basically, it’s same class (FQ class name) being loaded from different class loaders. It’s a constraint check implemented by JVM since 1.2.

So that given permission to a class from one classloader, doesn’t grant the same permission to “same” class loaded by another classloader by default.

Apache Parquet writer on Windows

There was an issue when running the parquet writer on windows

Change File Mode By Mask error’ (5): Access is denied.

turns out this is due to Parquet is using hadoop filesystem for accessing the file, which subsequently requires a permission for the /tmp/hive folder.

the solution is to run

winutils.exe chmod -R 777 C:\tmp\hive

however, to note, the drive letter should be the same as the HADOOP_HOME or the winutils location.

spark class conflict with uber jar

when I am running spark, there is an exception thrown

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: java.lang.NullPointerException

turns out this is due to the class conflicts.

as i have some libraries bundled in the uber jar, for example, the

com.google.protobuf

hence i have requested spark to use my own libraries, instead of the ones bundled in spark by

--conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true

this works as intended to point to the right libraries. however, seems like there are more libraries duplicated between the uber jar and spark.

hence instead of using above option, and turning to maven shaded sorted out the issue.

issue for objects with circular reference in spark

there is a limitation on spark at the moment, which would throw out an exception immediately if the object has a circular reference.

which turns out the issue is only with the default serializer (java). if instead, switching to kryo which a better performance, the circular reference could be well taken care of:

instead of using

Encoder<Model> encoder = Encoders.bean(Model.class);
Dataset<Model> rowData = spark.createDataset(models, encoder);
//this worked
Encoder<Model> encoder = Encoders.kryo(Model.class);

Workaround for protobuff buffersize limitation

There is a data size constraint on protobuff, as it’s using `int` type for bufferSize, which limits the maximum value to serialize to 2GB.

There are two possibly workaround, which are basically same concept:

flush to the same stream by batch

as an example, it could either batch per 1 million rows, or if the data size is above 268 Mb

            while (rs != null && rs.next()) {
                models.addModels(..newBuilder().set...(rs.getString("..")...)
                        .build());


                if(++rowcount > 1_000_000){
//                if(rowcount > 1_000_000 || models.build().getSerializedSize() > Math.pow(2,28)){
                    rowcount=0;

                    //flush by batch
                    try (FileOutputStream fos = new FileOutputStream(Constants.MODEL_PB_FILE, true)) {
                        model.build().writeTo(fos);
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    model.clear();
                }
            }

alternatively, this could be pushed and read by batch from different streams.

                if(++rowcount >= 1_000_000){
                    rowcount=0;

                    //flush by batch
                    try (FileOutputStream fos = new FileOutputStream(CACHE_FILE + currentFileIndex++, true)) {
                        models.build().writeTo(fos);
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    models.clear();
                }

same for read

            Files.list(Paths.get(Constants.CACHE_FILE_DIR + File.separator+Constants.PB_FILE)).filter(Files::isRegularFile)
                        .map(Path::toFile)
                        .filter(file -> file.getName().startsWith(Constants.PB_FILE))
                        .parallel().map(file -> readFile(file))
                        .reduce(....)