Logstash is an open-source data processing pipeline that can simultaneously collect data from multiple sources, process the data, and send it to a destination of your choice. Logstash is part of the Elastic Stack (formerly known as the ELK Stack), which also includes Elasticsearch and Kibana. These three tools are often used together to provide powerful search, analysis, and visualization capabilities. Lakehouse provides a Logstash Connector that can directly connect to Lakehouse.

Deployment

# Uninstall Plugin
./bin/logstash-plugin uninstall logstash-output-clickzetta && rm -rf ./vendor/local_gems/

# Install Plugin
./bin/logstash-plugin install --no-verify --local logstash-output-clickzetta-0.0.1.gem 

# View Plugin
./bin/logstash-plugin list --verbose

Test

cd /logstash-data
/usr/share/logstash/bin/logstash -f data/logstash.conf.5 --log.level error
# The default number of workers is the number of machine cores (-w), and the default batch size is 125 (-b). You can adjust the corresponding parameters with -w 16 -b 200.

File input

Refer to file input plugin

input {
  file {
                path => "/logstash-data/data/igs_worker_full_log.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_1.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_2.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_3.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_4.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
}

output {
    clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "test_simple_data-%{+YYYY.MM.dd}"
            internalMode => false
            directMode => false
        }
}

Kafka input

Kafka input configuration reference [kafka as input access]

input {
    kafka {
                bootstrap_servers => "alikafka-pre-cn-co92y9d22001-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-co92y9d22001-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-co92y9d22001-3.alikafka.aliyuncs.com:9093"
                group_id => "logstash-test-group"
                decorate_events => true
                topics => ["igs-worker-log-for-sla"]
                security_protocol => "SASL_SSL"
                sasl_mechanism => "PLAIN"
                sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
                ssl_truststore_password => "KafkaOnsClient"
                ssl_truststore_location => "/logstash-data/only.4096.client.truststore.jks"
                ssl_endpoint_identification_algorithm => ""
                consumer_threads => 3
                codec => json {
                    charset => "UTF-8"
                }
        }
}

filter {
mutate {
    add_field => {
        "kafka_topic_name" => "%{[fields][log_topic]}"
    }
}
grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:@timestamp}\] \[%{NUMBER:thread_id}\] \[%{LOGLEVEL:log_level}\] \[%{DATA:source_file}\] \[%{DATA:function_name}\]%{GREEDYDATA:log_content}" }
}
ruby {
        code => "
            if event.get('thread_id').to_i % 2 == 0
                event.tag('even')
            else
                event.tag('odd')
            end
        "
    }
}
output {
    if "odd" in [tags] {
        clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "liuwei_log_data-odd-%{kafka_topic_name}-%{+YYYY.MM.dd}"
            tablet => 16
            debug => true
            internalMode => false
            directMode => false
        }
    } else {
        clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "liuwei_log_data-even-%{kafka_topic_name}-%{+YYYY.MM.dd}"
            tablet => 16
            debug => true
            internalMode => false
            directMode => false
        }
    }
}