spark class conflict with uber jar

when I am running spark, there is an exception thrown

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.io.IOException: java.lang.NullPointerException

turns out this is due to the class conflicts.

as i have some libraries bundled in the uber jar, for example, the

com.google.protobuf

hence i have requested spark to use my own libraries, instead of the ones bundled in spark by

--conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true

this works as intended to point to the right libraries. however, seems like there are more libraries duplicated between the uber jar and spark.

hence instead of using above option, and turning to maven shaded sorted out the issue.

issue for objects with circular reference in spark

there is a limitation on spark at the moment, which would throw out an exception immediately if the object has a circular reference.

which turns out the issue is only with the default serializer (java). if instead, switching to kryo which a better performance, the circular reference could be well taken care of:

instead of using

Encoder<Model> encoder = Encoders.bean(Model.class);
Dataset<Model> rowData = spark.createDataset(models, encoder);
//this worked
Encoder<Model> encoder = Encoders.kryo(Model.class);

Workaround for protobuff buffersize limitation

There is a data size constraint on protobuff, as it’s using `int` type for bufferSize, which limits the maximum value to serialize to 2GB.

There are two possibly workaround, which are basically same concept:

flush to the same stream by batch

as an example, it could either batch per 1 million rows, or if the data size is above 268 Mb

            while (rs != null && rs.next()) {
                models.addModels(..newBuilder().set...(rs.getString("..")...)
                        .build());


                if(++rowcount > 1_000_000){
//                if(rowcount > 1_000_000 || models.build().getSerializedSize() > Math.pow(2,28)){
                    rowcount=0;

                    //flush by batch
                    try (FileOutputStream fos = new FileOutputStream(Constants.MODEL_PB_FILE, true)) {
                        model.build().writeTo(fos);
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    model.clear();
                }
            }

alternatively, this could be pushed and read by batch from different streams.

                if(++rowcount >= 1_000_000){
                    rowcount=0;

                    //flush by batch
                    try (FileOutputStream fos = new FileOutputStream(CACHE_FILE + currentFileIndex++, true)) {
                        models.build().writeTo(fos);
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    models.clear();
                }

same for read

            Files.list(Paths.get(Constants.CACHE_FILE_DIR + File.separator+Constants.PB_FILE)).filter(Files::isRegularFile)
                        .map(Path::toFile)
                        .filter(file -> file.getName().startsWith(Constants.PB_FILE))
                        .parallel().map(file -> readFile(file))
                        .reduce(....)

read and write multiple objects from protobuff

a simple trick to read and write the object through protobuff, is to create an additional wrapper .proto

syntax = "proto3";

option java_package = "models";

message Model{
int64 id =1;
string name =2;
}

then wrap it

message Models {
repeated Model models = 1;
}

then instead of read and write the objects one by one through java stream. it could be read and write in one go through Models.

ModelsClass.Models models = ModelsClass.Models.newBuilder().mergeFrom(fileInputStream).build();
models.writeTo(fileOutputStream);

git ssh passphrases on windows

on *nix machines, the passphrases could be saved with the keychain at ease.

on windows machines, the key could be added to ssh-agent. then while the ssh-agent is running, it will save the hassle to keep asking for passphrases during git operations.

here are the steps:

in git bash, add below to have the key added to ssh-agent on bash start:

env=~/.ssh/agent.env 
agent_load_env () { test -f "$env" && . "$env" >| /dev/null ; } 
agent_start () {     
(umask 077; ssh-agent >| "$env")     . "$env" >| /dev/null ; } 
agent_load_env 
# agent_run_state: 0=agent running w/ key; 1=agent w/o key; 2= agent not running agent_run_state=$(ssh-add -l >| /dev/null 2>&1; echo $?) 
if [ ! "$SSH_AUTH_SOCK" ] || [ $agent_run_state = 2 ]; then
     agent_start     
     ssh-add #the path to the private key; ssh-add ~/.ssh/gitlab
elif [ "$SSH_AUTH_SOCK" ] && [ $agent_run_state = 1 ]; then     
     ssh-add #the path to the private key; ssh-add ~/.ssh/gitlab
fi 
unset env

Ref: https://docs.github.com/en/free-pro-team@latest/github/authenticating-to-github/working-with-ssh-key-passphrases

lombok issue on intellij

When I am trying to run the project from intellij, it’s keep complaining

java.lang.classnotfoundexception: lombok.javac.handlers.handlelog$handlecustomlog

Even though I can see the variable, example, log, is generated in the class.

Turns out this is due to the buggy maven plugin (the internal repo’s cacerts not trusted), which has put two lombok plugins in the path:

To clear the duplicate, there are two places to remove the duplicate:

  • the dependencies in the specific module
  • the libraries

kibana: unable to create index

I have encountered an error when trying to create an index, saying “Error forbidden”

turns out the issue is with elastic search.

checking elastic search, there is an error with operation blocked due to “read only”:

this not only blocking the kibana for updating, for example create new index, it blocks new logs from being pumped in as well.

and ultimately, this was set due to the flood_stage watermark:

https://www.elastic.co/guide/en/elasticsearch/reference/6.8/disk-allocator.html

to sort out the issue, i have cleared some old docker images and containers:

followed by a call to reset the read_only

which brought the elasticsearch back to normal

EFK: elasticsearch kibana fluentd

Set up the EFK stack in kubernetes:

service namespace

apiVersion: v1
kind: Namespace
metadata:
  name: core-service

elasticsearch

apiVersion: apps/v1
kind: Deployment
metadata:
  name: elasticsearch
  labels:
    component: elasticsearch
spec:
  selector:
    matchLabels:
      component: elasticsearch
  template:
    metadata:
      labels:
        component: elasticsearch
    spec:
      containers:
        - name: elasticsearch
          image: docker.elastic.co/elasticsearch/elasticsearch:7.4.1
          env:
            - name: discovery.type
              value: single-node
          ports:
            - containerPort: 9200
              name: http
              protocol: TCP
          resources:
            limits:
              cpu: 500m
              memory: 4Gi
            requests:
              cpu: 500m
              memory: 4Gi
---
apiVersion: v1
kind: Service
metadata:
  name: elasticsearch
  labels:
    service: elasticsearch
spec:
  type: NodePort
  selector:
    component: elasticsearch
  ports:
  - name: es
    port: 9200
    targetPort: 9200

fluentd and fluentd roles

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: core-service
  labels:
    k8s-app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: fluentd
          image: fluent/fluentd-kubernetes-daemonset:v1.3-debian-elasticsearch
          env:
            - name:  FLUENT_ELASTICSEARCH_HOST
              value: "elasticsearch.default.svc"
            - name:  FLUENT_ELASTICSEARCH_PORT
              value: "9200"
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: "http"
            - name: FLUENT_UID
              value: "0"
          resources:
            limits:
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd
  namespace: core-service

---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: fluentd
  namespace: core-service
rules:
  - apiGroups:
      - ""
    resources:
      - pods
      - namespaces
    verbs:
      - get
      - list
      - watch

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: fluentd
roleRef:
  kind: ClusterRole
  name: fluentd
  apiGroup: rbac.authorization.k8s.io
subjects:
  - kind: ServiceAccount
    name: fluentd
    namespace: core-service

kibana

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
spec:
  selector:
    matchLabels:
      run: kibana
  template:
    metadata:
      labels:
        run: kibana
    spec:
      containers:
        - name: kibana
          image: docker.elastic.co/kibana/kibana:7.4.1
          env:
            - name: ELASTICSEARCH_URL
              value: http://elasticsearch:9200
            - name: XPACK_SECURITY_ENABLED
              value: "true"
          ports:
            - containerPort: 5601
              name: http
              protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
  name: kibana
  labels:
    service: kibana
spec:
  type: NodePort
  selector:
    run: kibana
  ports:
  - name: kbai
    port: 5601
    targetPort: 5601

The connecting parts between the components is the elastic search URL

elasticsearch.namespace.svc:9200

if accross namespace

elasticsearch:9200

if it’s the same namespace

even though on stack monitoring, it’s expecting filebeat

fluentd is actually working well with elasticsearch to pump in the logs