ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring Cloud Data Flow
    Spring 2024. 10. 2. 15:09
    반응형

    공식 문서 사이트

    수동 다운로드 사이트

    작성일 기준으로 최신 버전은 2.11.4

    설명

    Spring Cloud Skipper

    Spring Cloud Data Flow와 함께 사용되는 배포 관리 도구로, 애플리케이션 배포의 관리 및 업그레이드를 간편하게 수행할 수 있도록 도와주며, Skipper는 Spring Cloud Data Flow에서 스트림 및 작업(Tasks)을 구성하고 실행하는데 있어 버전 관리, 롤백, 및 점진적 업그레이드와 같은 복잡한 배포 시나리오를 지원하는 역할을 한다.

    주요 역할 및 기능

    1. 버전 관리:

      • Skipper는 애플리케이션의 각 배포에 대해 버전을 관리하며 새로운 버전의 애플리케이션을 배포하거나, 문제가 발생했을 때 이전 버전으로 롤백을 할 수 있다.
    2. 점진적 업그레이드:

      • Skipper는 애플리케이션의 새로운 버전을 배포할 때, 한 번에 전체 시스템을 업그레이드하는 대신, 부분적으로 점진적으로 업그레이드할 수 있도록 지원한다.

        예를 들어, 스트림의 일부 인스턴스만 먼저 업그레이드한 후, 문제가 없으면 나머지 인스턴스도 업그레이드하는 방식으로 안정성을 높일 수 있다.

    3. 롤백 기능:

      • 새로운 버전 배포 후 문제가 발생할 경우, 이전 안정된 버전으로 빠르게 롤백할 수 있다. Skipper는 이 과정을 단순화하여 다운타임을 최소화.
    4. 상태 관리:

      • Skipper는 각 배포의 상태를 관리한다. 예를 들어, 현재 배포된 애플리케이션의 상태가 정상인지, 배포 과정에서 실패한 것은 없는지 등을 추적 가능.
    5. 추상화된 배포 환경:

      • Skipper는 Kubernetes, Cloud Foundry 등 다양한 환경에서 애플리케이션을 배포할 수 있도록 추상화된 인터페이스를 제공. 이를 통해 다양한 클라우드 및 온프레미스 환경에서 일관된 방식으로 애플리케이션 배포가 가능.

    Spring Cloud Dataflow Server

    데이터 스트림 처리배치 작업을 관리하고 오케스트레이션하는 플랫폼. 이 서버는 Spring Cloud Data Flow 생태계의 핵심 컴포넌트로서, 데이터 파이프라인을 설계, 배포, 모니터링 및 관리.

    주요 기능

    1. 데이터 스트림 처리:

      • Spring Cloud Data Flow Server는 스트림을 구성하고 실행. 스트림은 데이터가 소스에서 시작해 프로세서로 전달되고, 마지막으로 싱크로 전송되는 데이터 파이프라인을 나타냄.

        예를 들어, Kafka에서 데이터를 읽어와 이를 변환하고, 다시 데이터베이스에 저장하는 스트림을 구성할 수 있다.

    2. 배치 작업 관리:

      • 배치 작업은 일괄 처리가 필요한 데이터 작업을 정의.

        Spring Cloud Data Flow Server는 Spring Batch와 통합되어, 대량의 데이터를 일괄 처리하는 작업을 정의하고, 스케줄링하며, 관리할 수 있다.

    3. 구성 및 배포:

      • Spring Cloud Data Flow Server는 데이터 스트림과 배치 작업을 손쉽게 구성하고 배포할 수 있는 UI와 REST API를 제공.

        이를 통해 다양한 애플리케이션을 간편하게 연결하고, Kubernetes, Cloud Foundry, 또는 로컬 환경에 배포 가능.

    4. 모니터링 및 관리:

      • 서버는 실행 중인 스트림과 배치 작업의 상태를 모니터링하고, 로그를 분석하며, 메트릭스를 수집하여 실시간으로 관리할 수 있는 기능을 제공.

        이를 통해 애플리케이션의 성능과 건강 상태 추적 가능.

    5. 확장 가능성:

      • Spring Cloud Data Flow는 확장 가능한 아키텍처를 제공하여, 클라우드 환경에서도 대규모 데이터 처리 작업을 효율적으로 수행할 수 있으며 필요에 따라 노드를 추가하거나, 배포된 애플리케이션을 확장하여 더 많은 데이터를 처리할 수 있다.

    구성 요소

    1. Spring Cloud Stream:

      • 데이터 스트림을 구성하고, 메시지 기반의 통신을 관리하는 라이브러리.

        Kafka, RabbitMQ와 같은 메시징 시스템을 사용하여 스트림 내의 데이터를 전달.

    2. Spring Batch:

      • 대규모 데이터 처리를 위해 배치 작업을 정의하고 실행할 수 있는 프레임워크.

        데이터 변환, 로드, 저장 작업을 자동화하고, 이를 Spring Cloud Data Flow와 연동하여 관리.

    3. Skipper:

      • 애플리케이션 배포의 버전 관리, 점진적 업그레이드, 롤백 등을 수행하는 도구로, Spring Cloud Data Flow Server와 통합 사용.

    실행

    SCDF 설치 및 실행은

    • Local Machine
      • Docker
      • Manual
    • Cloud Foundry
    • Kubernetes

    로 가능하다.

    해당 글에 예제는 Stream은 제외한 Batch를 예제로 들며 설치 및 실행은 Local Machine-Manual 기준으로 되어있다.

    Manual은 Spring에서 제공하는 jar 파일을 다운로드 받아 Spring boot를 실행 하는 식으로 실행한다.

    SCDF를 이용한 Batch는 Scheduling 기능도 제공하나 Local은 제공되지 않고 Cloud Foundry, Kubernetes에서만 가능하다.(참고)

    참고에 따르면 Local인 경우엔

    • Quartz
    • @Scheduled

    구성으로 별도의 Scheduling를 구현 해야 한다. 그리고 SCDF는 2.11.x 이후 버전부터 Spring boot 3를 지원하기 시작하였고 본 예제는 SCDF 2.11.4 버전으로 되어있다.

    그리고 SCDF 제어를 위해 Spring Cloud Dataflow Server REST APISpring Cloud Dataflow Shell이 제공되는데 Shell 같은 경우엔 jar 파일을 별도로 실행해야 하기 때문에 예제에선 Shell이 아닌 REST API를 사용한다. REST API는 별도 설치 필요 없이 Spring Cloud Dataflow Server에 요청해서 제어를 할 수 있다.

    구성

    • Mysql 5.7
    • Java 21
    • Spring Batch 5.1.2

    Download

    현재 최신 버전인 2.11.4 기준으로 아래 2개를 다운로드 받는다.

    초기 작업

    SCDF 사용을 위해선 메타테이블을 생성해야 하는데 메타테이블은

    • Spring Batch 메타테이블
    • Task 메타테이블
    • Skipper 메타테이블

    들을 생성해야 하고 여기서 Spring Batch, Task 메타테이블인 경우 Spring boot 3을 위한 별도 메타테이블을 생성해야 한다. 해당 별도 메타테이블에는 prefix가 필요한데 prefix는 아래를 참고.

    • SCDF & TASK 메타테이블인 경우엔 BOOT3_TASK_
    • Batch 메타테이블인 경우엔 BOOT3_BATCH_

    해당 메타테이블의 사용 여부는 어플리케이션을 등록할 때 Spring Boot version 부분을 3.x로 설정하게 되면 테이블 명에 BOOT3이 있는 메타 테이블을 사용하고 2.x로 설정하면 기존 메타 테이블을 사용하며 해당 부분은 Spring에서 강제하기에 별도 설정은 없는 것으로 보인다.

    원래는 Spring Cloud Dataflow Server를 실행하면 메타테이블들을 자동으로 생성하는데 Mysql인 경우엔 이슈가 있어 실행 전에 별도로 메타테이블들을 생성해줘야 한다. 이슈에 대해선 해당 글에 있는 이슈 항목을 참고하고 메타테이블 생성 쿼리는 아래를 참고한다.

    -- V1-dataflow.sql
    create table if not exists hibernate_sequence (
        next_val bigint
    );
    
    insert into hibernate_sequence (next_val)
      select * from (select 1 as next_val) as temp
      where not exists(select * from hibernate_sequence);
    
    create table app_registration (
      id bigint not null,
      object_version bigint,
      default_version bit,
      metadata_uri longtext,
      name varchar(255),
      type integer,
      uri longtext,
      version varchar(255),
      primary key (id)
    );
    
    create table task_deployment (
      id bigint not null,
      object_version bigint,
      task_deployment_id varchar(255) not null,
      task_definition_name varchar(255) not null,
      platform_name varchar(255) not null,
      created_on datetime,
      primary key (id)
    );
    
    create table audit_records (
      id bigint not null,
      audit_action bigint,
      audit_data longtext,
      audit_operation bigint,
      correlation_id varchar(255),
      created_by varchar(255),
      created_on datetime,
      primary key (id)
    );
    
    create table stream_definitions (
      definition_name varchar(255) not null,
      definition longtext,
      primary key (definition_name)
    );
    
    create table task_definitions (
      definition_name varchar(255) not null,
      definition longtext,
      primary key (definition_name)
    );
    
    CREATE TABLE TASK_EXECUTION (
      TASK_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
      START_TIME DATETIME DEFAULT NULL,
      END_TIME DATETIME DEFAULT NULL,
      TASK_NAME  VARCHAR(100),
      EXIT_CODE INTEGER,
      EXIT_MESSAGE VARCHAR(2500),
      ERROR_MESSAGE VARCHAR(2500),
      LAST_UPDATED TIMESTAMP,
      EXTERNAL_EXECUTION_ID VARCHAR(255),
      PARENT_EXECUTION_ID BIGINT
    );
    
    CREATE TABLE TASK_EXECUTION_PARAMS (
      TASK_EXECUTION_ID BIGINT NOT NULL,
      TASK_PARAM VARCHAR(2500),
      constraint TASK_EXEC_PARAMS_FK foreign key (TASK_EXECUTION_ID)
      references TASK_EXECUTION(TASK_EXECUTION_ID)
    );
    
    CREATE TABLE TASK_TASK_BATCH (
      TASK_EXECUTION_ID BIGINT NOT NULL,
      JOB_EXECUTION_ID BIGINT NOT NULL,
      constraint TASK_EXEC_BATCH_FK foreign key (TASK_EXECUTION_ID)
      references TASK_EXECUTION(TASK_EXECUTION_ID)
    );
    
    CREATE TABLE TASK_SEQ (
      ID BIGINT NOT NULL,
      UNIQUE_KEY CHAR(1) NOT NULL,
      constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    );
    
    INSERT INTO TASK_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp;
    
    CREATE TABLE TASK_LOCK (
      LOCK_KEY CHAR(36) NOT NULL,
      REGION VARCHAR(100) NOT NULL,
      CLIENT_ID CHAR(36),
      CREATED_DATE DATETIME(6) NOT NULL,
      constraint LOCK_PK primary key (LOCK_KEY, REGION)
    );
    
    CREATE TABLE BATCH_JOB_INSTANCE (
      JOB_INSTANCE_ID BIGINT  NOT NULL PRIMARY KEY,
      VERSION BIGINT,
      JOB_NAME VARCHAR(100) NOT NULL,
      JOB_KEY VARCHAR(32) NOT NULL,
      constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
    );
    
    CREATE TABLE BATCH_JOB_EXECUTION (
      JOB_EXECUTION_ID BIGINT  NOT NULL PRIMARY KEY,
      VERSION BIGINT,
      JOB_INSTANCE_ID BIGINT NOT NULL,
      CREATE_TIME DATETIME NOT NULL,
      START_TIME DATETIME DEFAULT NULL,
      END_TIME DATETIME DEFAULT NULL,
      STATUS VARCHAR(10),
      EXIT_CODE VARCHAR(2500),
      EXIT_MESSAGE VARCHAR(2500),
      LAST_UPDATED DATETIME,
      JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
      constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
      references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
    );
    
    CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
      JOB_EXECUTION_ID BIGINT NOT NULL,
      TYPE_CD VARCHAR(6) NOT NULL,
      KEY_NAME VARCHAR(100) NOT NULL,
      STRING_VAL VARCHAR(250),
      DATE_VAL DATETIME DEFAULT NULL,
      LONG_VAL BIGINT,
      DOUBLE_VAL DOUBLE PRECISION,
      IDENTIFYING CHAR(1) NOT NULL,
      constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
      references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    );
    
    CREATE TABLE BATCH_STEP_EXECUTION (
      STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
      VERSION BIGINT NOT NULL,
      STEP_NAME VARCHAR(100) NOT NULL,
      JOB_EXECUTION_ID BIGINT NOT NULL,
      START_TIME DATETIME NOT NULL,
      END_TIME DATETIME DEFAULT NULL,
      STATUS VARCHAR(10),
      COMMIT_COUNT BIGINT,
      READ_COUNT BIGINT,
      FILTER_COUNT BIGINT,
      WRITE_COUNT BIGINT,
      READ_SKIP_COUNT BIGINT,
      WRITE_SKIP_COUNT BIGINT,
      PROCESS_SKIP_COUNT BIGINT,
      ROLLBACK_COUNT BIGINT,
      EXIT_CODE VARCHAR(2500),
      EXIT_MESSAGE VARCHAR(2500),
      LAST_UPDATED DATETIME,
      constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
      references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    );
    
    CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
      STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
      SHORT_CONTEXT VARCHAR(2500) NOT NULL,
      SERIALIZED_CONTEXT TEXT,
      constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
      references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
    );
    
    CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
      JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
      SHORT_CONTEXT VARCHAR(2500) NOT NULL,
      SERIALIZED_CONTEXT TEXT,
      constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
      references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
    );
    
    CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
      ID BIGINT NOT NULL,
      UNIQUE_KEY CHAR(1) NOT NULL,
      constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    );
    
    INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
    
    CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
      ID BIGINT NOT NULL,
      UNIQUE_KEY CHAR(1) NOT NULL,
      constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    );
    
    INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
    
    CREATE TABLE BATCH_JOB_SEQ (
      ID BIGINT NOT NULL,
      UNIQUE_KEY CHAR(1) NOT NULL,
      constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    );
    
    INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
    
    -- V1-skipper.sql
    create table if not exists hibernate_sequence (
        next_val bigint
    );
    
    insert into hibernate_sequence (next_val)
      select * from (select 1 as next_val) as temp
      where not exists(select * from hibernate_sequence);
    
    create table skipper_app_deployer_data (
      id bigint not null,
      object_version bigint,
      deployment_data longtext,
      release_name varchar(255),
      release_version integer,
      primary key (id)
    );
    
    create table skipper_info (
      id bigint not null,
      object_version bigint,
      deleted datetime,
      description varchar(255),
      first_deployed datetime,
      last_deployed datetime,
      status_id bigint,
      primary key (id)
    );
    
    create table skipper_manifest (
      id bigint not null,
      object_version bigint,
      data longtext,
      primary key (id)
    );
    
    create table skipper_package_file (
      id bigint not null,
      package_bytes longblob,
      primary key (id)
    );
    
    create table skipper_package_metadata (
      id bigint not null,
      object_version bigint,
      api_version varchar(255),
      description longtext,
      display_name varchar(255),
      icon_url longtext,
      kind varchar(255),
      maintainer varchar(255),
      name varchar(255),
      origin varchar(255),
      package_home_url longtext,
      package_source_url longtext,
      repository_id bigint,
      repository_name varchar(255),
      sha256 varchar(255),
      tags longtext,
      version varchar(255),
      packagefile_id bigint,
      primary key (id)
    );
    
    create table skipper_release (
      id bigint not null,
      object_version bigint,
      config_values_string longtext,
      name varchar(255),
      package_metadata_id bigint,
      pkg_json_string longtext,
      platform_name varchar(255),
      repository_id bigint,
      version integer not null,
      info_id bigint,
      manifest_id bigint,
      primary key (id)
    );
    
    create table skipper_repository (
      id bigint not null,
      object_version bigint,
      description varchar(255),
      local bit,
      name varchar(255),
      repo_order integer,
      source_url longtext,
      url longtext,
      primary key (id)
    );
    
    create table skipper_status (
      id bigint not null,
      platform_status longtext,
      status_code varchar(255),
      primary key (id)
    );
    
    create table action (
      id bigint not null,
      name varchar(255),
      spel varchar(255),
      primary key (id)
    );
    
    create table deferred_events (
      jpa_repository_state_id bigint not null,
      deferred_events varchar(255)
    );
    
    create table guard (
      id bigint not null,
      name varchar(255),
      spel varchar(255),
      primary key (id)
    );
    
    create table state (
      id bigint not null,
      initial_state bit not null,
      kind integer,
      machine_id varchar(255),
      region varchar(255),
      state varchar(255),
      submachine_id varchar(255),
      initial_action_id bigint,
      parent_state_id bigint,
      primary key (id)
    );
    
    create table state_entry_actions (
      jpa_repository_state_id bigint not null,
      entry_actions_id bigint not null,
      primary key (jpa_repository_state_id, entry_actions_id)
    );
    
    create table state_exit_actions (
      jpa_repository_state_id bigint not null,
      exit_actions_id bigint not null,
      primary key (jpa_repository_state_id, exit_actions_id)
    );
    
    create table state_state_actions (
      jpa_repository_state_id bigint not null,
      state_actions_id bigint not null,
      primary key (jpa_repository_state_id, state_actions_id)
    );
    
    create table state_machine (
      machine_id varchar(255) not null,
      state varchar(255),
      state_machine_context longblob,
      primary key (machine_id)
    );
    
    create table transition (
      id bigint not null,
      event varchar(255),
      kind integer,
      machine_id varchar(255),
      guard_id bigint,
      source_id bigint,
      target_id bigint,
      primary key (id)
    );
    
    create table transition_actions (
      jpa_repository_transition_id bigint not null,
      actions_id bigint not null,
      primary key (jpa_repository_transition_id, actions_id)
    );
    
    create index idx_pkg_name on skipper_package_metadata (name);
    
    create index idx_rel_name on skipper_release (name);
    
    create index idx_repo_name on skipper_repository (name);
    
    alter table skipper_repository
      add constraint uk_repository unique (name);
    
    alter table deferred_events
      add constraint fk_state_deferred_events
      foreign key (jpa_repository_state_id)
      references state (id);
    
    alter table skipper_info
      add constraint fk_info_status
      foreign key (status_id)
      references skipper_status (id);
    
    alter table skipper_package_metadata
      add constraint fk_package_metadata_pfile
      foreign key (packagefile_id)
      references skipper_package_file (id);
    
    alter table skipper_release
      add constraint fk_release_info
      foreign key (info_id)
      references skipper_info (id);
    
    alter table skipper_release
      add constraint fk_release_manifest
      foreign key (manifest_id)
      references skipper_manifest (id);
    
    alter table state
      add constraint fk_state_initial_action
      foreign key (initial_action_id)
      references action (id);
    
    alter table state
      add constraint fk_state_parent_state
      foreign key (parent_state_id)
      references state (id);
    
    alter table state_entry_actions
      add constraint fk_state_entry_actions_a
      foreign key (entry_actions_id)
      references action (id);
    
    alter table state_entry_actions
      add constraint fk_state_entry_actions_s
      foreign key (jpa_repository_state_id)
      references state (id);
    
    alter table state_exit_actions
      add constraint fk_state_exit_actions_a
      foreign key (exit_actions_id)
      references action (id);
    
    alter table state_exit_actions
      add constraint fk_state_exit_actions_s
      foreign key (jpa_repository_state_id)
      references state (id);
    
    alter table state_state_actions
      add constraint fk_state_state_actions_a
      foreign key (state_actions_id)
      references action (id);
    
    alter table state_state_actions
      add constraint fk_state_state_actions_s
      foreign key (jpa_repository_state_id)
      references state (id);
    
    alter table transition
      add constraint fk_transition_guard
      foreign key (guard_id)
      references guard (id);
    
    alter table transition
      add constraint fk_transition_source
      foreign key (source_id)
      references state (id);
    
    alter table transition
      add constraint fk_transition_target
      foreign key (target_id)
      references state (id);
    
    alter table transition_actions
      add constraint fk_transition_actions_a
      foreign key (actions_id)
      references action (id);
    
    alter table transition_actions
      add constraint fk_transition_actions_t
      foreign key (jpa_repository_transition_id)
      references transition (id);
    
    
    -- V2-dataflow.sql
    alter table stream_definitions add description varchar(255);
    
    alter table stream_definitions add original_definition longtext;
    
    alter table task_definitions add description varchar(255);
    
    CREATE TABLE task_execution_metadata (
      id BIGINT NOT NULL,
      task_execution_id BIGINT NOT NULL,
      task_execution_manifest LONGTEXT,
      primary key (id),
      CONSTRAINT TASK_METADATA_FK FOREIGN KEY (task_execution_id)
      REFERENCES TASK_EXECUTION(TASK_EXECUTION_ID)
    );
    
    CREATE TABLE task_execution_metadata_seq (
      ID BIGINT NOT NULL,
      UNIQUE_KEY CHAR(1) NOT NULL,
      constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    );
    
    INSERT INTO task_execution_metadata_seq (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from task_execution_metadata_seq);
    
    -- V2-dataflow-after.sql
    update stream_definitions set original_definition=definition;
    
    -- V3-dataflow.sql
    alter table audit_records add platform_name varchar(255);
    
    -- V4-dataflow.sql
    create index STEP_NAME_IDX on BATCH_STEP_EXECUTION (STEP_NAME);
    
    -- V5-dataflow.sql
    create index TASK_EXECUTION_ID_IDX on TASK_EXECUTION_PARAMS (TASK_EXECUTION_ID);
    
    -- V6-dataflow.sql
    alter table app_registration add boot_version varchar(16);
    
    -- V7-dataflow.sql
    create table if not exists BOOT3_TASK_EXECUTION
    (
        TASK_EXECUTION_ID     BIGINT NOT NULL PRIMARY KEY,
        START_TIME            DATETIME(6) DEFAULT NULL,
        END_TIME              DATETIME(6) DEFAULT NULL,
        TASK_NAME             VARCHAR(100),
        EXIT_CODE             INTEGER,
        EXIT_MESSAGE          VARCHAR(2500),
        ERROR_MESSAGE         VARCHAR(2500),
        LAST_UPDATED          TIMESTAMP,
        EXTERNAL_EXECUTION_ID VARCHAR(255),
        PARENT_EXECUTION_ID   BIGINT
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_TASK_EXECUTION_PARAMS
    (
        TASK_EXECUTION_ID BIGINT NOT NULL,
        TASK_PARAM        VARCHAR(2500),
        constraint BOOT3_TASK_EXEC_PARAMS_FK foreign key (TASK_EXECUTION_ID)
            references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_TASK_TASK_BATCH
    (
        TASK_EXECUTION_ID BIGINT NOT NULL,
        JOB_EXECUTION_ID  BIGINT NOT NULL,
        constraint BOOT3_TASK_EXEC_BATCH_FK foreign key (TASK_EXECUTION_ID)
            references BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_TASK_SEQ
    (
        ID         BIGINT  NOT NULL,
        UNIQUE_KEY CHAR(1) NOT NULL,
        constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    
    INSERT INTO BOOT3_TASK_SEQ (ID, UNIQUE_KEY)
    select *
    from (select 0 as ID, '0' as UNIQUE_KEY) as tmp;
    
    create table if not exists BOOT3_TASK_LOCK
    (
        LOCK_KEY     CHAR(36)     NOT NULL,
        REGION       VARCHAR(100) NOT NULL,
        CLIENT_ID    CHAR(36),
        CREATED_DATE DATETIME(6) NOT NULL,
        constraint BOOT3_LOCK_PK primary key (LOCK_KEY, REGION)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_TASK_EXECUTION_METADATA
    (
        ID                      BIGINT NOT NULL,
        TASK_EXECUTION_ID       BIGINT NOT NULL,
        TASK_EXECUTION_MANIFEST TEXT,
        primary key (ID),
        CONSTRAINT BOOT3_TASK_METADATA_FK FOREIGN KEY (TASK_EXECUTION_ID) REFERENCES BOOT3_TASK_EXECUTION (TASK_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_TASK_EXECUTION_METADATA_SEQ
    (
        ID         BIGINT  NOT NULL,
        UNIQUE_KEY CHAR(1) NOT NULL,
        constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    INSERT INTO BOOT3_TASK_EXECUTION_METADATA_SEQ (ID, UNIQUE_KEY)
    select *
    from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
    where not exists(select * from BOOT3_TASK_EXECUTION_METADATA_SEQ);
    
    create table if not exists BOOT3_BATCH_JOB_INSTANCE
    (
        JOB_INSTANCE_ID BIGINT       NOT NULL PRIMARY KEY,
        VERSION         BIGINT,
        JOB_NAME        VARCHAR(100) NOT NULL,
        JOB_KEY         VARCHAR(32)  NOT NULL,
        constraint BOOT3_JOB_INST_UN unique (JOB_NAME, JOB_KEY)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_JOB_EXECUTION
    (
        JOB_EXECUTION_ID BIGINT       NOT NULL PRIMARY KEY,
        VERSION          BIGINT,
        JOB_INSTANCE_ID  BIGINT       NOT NULL,
        CREATE_TIME      DATETIME(6) NOT NULL,
        START_TIME       DATETIME(6) DEFAULT NULL,
        END_TIME         DATETIME(6) DEFAULT NULL,
        STATUS           VARCHAR(10),
        EXIT_CODE        VARCHAR(2500),
        EXIT_MESSAGE     VARCHAR(2500),
        LAST_UPDATED     DATETIME(6),
        constraint BOOT3_JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
            references BOOT3_BATCH_JOB_INSTANCE (JOB_INSTANCE_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_JOB_EXECUTION_PARAMS
    (
        JOB_EXECUTION_ID BIGINT       NOT NULL,
        PARAMETER_NAME   VARCHAR(100) NOT NULL,
        PARAMETER_TYPE   VARCHAR(100) NOT NULL,
        PARAMETER_VALUE  VARCHAR(2500),
        IDENTIFYING      CHAR(1)      NOT NULL,
        constraint BOOT3_JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
            references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_STEP_EXECUTION
    (
        STEP_EXECUTION_ID  BIGINT       NOT NULL PRIMARY KEY,
        VERSION            BIGINT       NOT NULL,
        STEP_NAME          VARCHAR(100) NOT NULL,
        JOB_EXECUTION_ID   BIGINT       NOT NULL,
        CREATE_TIME        DATETIME(6) NOT NULL,
        START_TIME         DATETIME(6) DEFAULT NULL,
        END_TIME           DATETIME(6) DEFAULT NULL,
        STATUS             VARCHAR(10),
        COMMIT_COUNT       BIGINT,
        READ_COUNT         BIGINT,
        FILTER_COUNT       BIGINT,
        WRITE_COUNT        BIGINT,
        READ_SKIP_COUNT    BIGINT,
        WRITE_SKIP_COUNT   BIGINT,
        PROCESS_SKIP_COUNT BIGINT,
        ROLLBACK_COUNT     BIGINT,
        EXIT_CODE          VARCHAR(2500),
        EXIT_MESSAGE       VARCHAR(2500),
        LAST_UPDATED       DATETIME(6),
        constraint BOOT3_JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
            references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_STEP_EXECUTION_CONTEXT
    (
        STEP_EXECUTION_ID  BIGINT        NOT NULL PRIMARY KEY,
        SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
        SERIALIZED_CONTEXT TEXT,
        constraint BOOT3_STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
            references BOOT3_BATCH_STEP_EXECUTION (STEP_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_JOB_EXECUTION_CONTEXT
    (
        JOB_EXECUTION_ID   BIGINT        NOT NULL PRIMARY KEY,
        SHORT_CONTEXT      VARCHAR(2500) NOT NULL,
        SERIALIZED_CONTEXT TEXT,
        constraint BOOT3_JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
            references BOOT3_BATCH_JOB_EXECUTION (JOB_EXECUTION_ID)
    ) ENGINE=InnoDB;
    
    create table if not exists BOOT3_BATCH_STEP_EXECUTION_SEQ
    (
        ID         BIGINT  NOT NULL,
        UNIQUE_KEY CHAR(1) NOT NULL,
        constraint BOOT3_UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    
    INSERT INTO BOOT3_BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY)
    select *
    from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
    where not exists(select * from BOOT3_BATCH_STEP_EXECUTION_SEQ);
    
    create table if not exists BOOT3_BATCH_JOB_EXECUTION_SEQ
    (
        ID         BIGINT  NOT NULL,
        UNIQUE_KEY CHAR(1) NOT NULL,
        constraint BOOT3_UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    
    INSERT INTO BOOT3_BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY)
    select *
    from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
    where not exists(select * from BOOT3_BATCH_JOB_EXECUTION_SEQ);
    
    create table if not exists BOOT3_BATCH_JOB_SEQ
    (
        ID         BIGINT  NOT NULL,
        UNIQUE_KEY CHAR(1) NOT NULL,
        constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
    ) ENGINE=InnoDB;
    
    INSERT INTO BOOT3_BATCH_JOB_SEQ (ID, UNIQUE_KEY)
    select *
    from (select 0 as ID, '0' as UNIQUE_KEY) as tmp
    where not exists(select * from BOOT3_BATCH_JOB_SEQ);
    
    -- V9-dataflow.sql
    CREATE VIEW AGGREGATE_TASK_EXECUTION AS
    SELECT TASK_EXECUTION_ID, START_TIME, END_TIME, TASK_NAME, EXIT_CODE, EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID, PARENT_EXECUTION_ID, 'boot2' AS SCHEMA_TARGET FROM TASK_EXECUTION
    UNION ALL
    SELECT TASK_EXECUTION_ID, START_TIME, END_TIME, TASK_NAME, EXIT_CODE, EXIT_MESSAGE, ERROR_MESSAGE, LAST_UPDATED, EXTERNAL_EXECUTION_ID, PARENT_EXECUTION_ID, 'boot3' AS SCHEMA_TARGET FROM BOOT3_TASK_EXECUTION;
    
    CREATE VIEW AGGREGATE_TASK_EXECUTION_PARAMS AS
    SELECT TASK_EXECUTION_ID, TASK_PARAM, 'boot2' AS SCHEMA_TARGET FROM TASK_EXECUTION_PARAMS
    UNION ALL
    SELECT TASK_EXECUTION_ID, TASK_PARAM, 'boot3' AS SCHEMA_TARGET FROM BOOT3_TASK_EXECUTION_PARAMS;
    
    CREATE VIEW AGGREGATE_JOB_EXECUTION AS
    SELECT JOB_EXECUTION_ID, VERSION, JOB_INSTANCE_ID, CREATE_TIME, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, LAST_UPDATED, 'boot2' AS SCHEMA_TARGET FROM BATCH_JOB_EXECUTION
    UNION ALL
    SELECT JOB_EXECUTION_ID, VERSION, JOB_INSTANCE_ID, CREATE_TIME, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, LAST_UPDATED, 'boot3' AS SCHEMA_TARGET FROM BOOT3_BATCH_JOB_EXECUTION;
    
    CREATE VIEW AGGREGATE_JOB_INSTANCE AS
    SELECT JOB_INSTANCE_ID, VERSION, JOB_NAME, JOB_KEY, 'boot2' AS SCHEMA_TARGET FROM BATCH_JOB_INSTANCE
    UNION ALL
    SELECT JOB_INSTANCE_ID, VERSION, JOB_NAME, JOB_KEY, 'boot3' AS SCHEMA_TARGET FROM BOOT3_BATCH_JOB_INSTANCE;
    
    CREATE VIEW AGGREGATE_TASK_BATCH AS
    SELECT TASK_EXECUTION_ID, JOB_EXECUTION_ID, 'boot2' AS SCHEMA_TARGET FROM TASK_TASK_BATCH
    UNION ALL
    SELECT TASK_EXECUTION_ID, JOB_EXECUTION_ID, 'boot3' AS SCHEMA_TARGET FROM BOOT3_TASK_TASK_BATCH;
    
    CREATE VIEW AGGREGATE_STEP_EXECUTION AS
    SELECT STEP_EXECUTION_ID, VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, EXIT_CODE, EXIT_MESSAGE, LAST_UPDATED, 'boot2' AS SCHEMA_TARGET FROM BATCH_STEP_EXECUTION
    UNION ALL
    SELECT STEP_EXECUTION_ID, VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, EXIT_CODE, EXIT_MESSAGE, LAST_UPDATED, 'boot3' AS SCHEMA_TARGET FROM BOOT3_BATCH_STEP_EXECUTION;
    
    -- V8-dataflow.sql
    RENAME TABLE task_execution_metadata TO task_execution_metadata_lc;
    RENAME TABLE task_execution_metadata_lc TO TASK_EXECUTION_METADATA;
    RENAME TABLE task_execution_metadata_seq TO task_execution_metadata_seq_lc;
    RENAME TABLE task_execution_metadata_seq_lc TO TASK_EXECUTION_METADATA_SEQ;
    
    -- V10-dataflow.sql
    create index BATCH_STEP_EXECUTION_JOB_EXECUTION_ID_IX on BATCH_STEP_EXECUTION(JOB_EXECUTION_ID);
    create index BOOT3_BATCH_STEP_EXECUTION_JOB_EXECUTION_ID_IX on BOOT3_BATCH_STEP_EXECUTION(JOB_EXECUTION_ID);
    create index BOOT3_TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on BOOT3_TASK_TASK_BATCH(JOB_EXECUTION_ID);
    create index TASK_TASK_BATCH_JOB_EXECUTION_ID_IX on TASK_TASK_BATCH(JOB_EXECUTION_ID);
    create index BATCH_JOB_EXECUTION_START_TIME_IX on BATCH_JOB_EXECUTION(START_TIME);
    create index BOOT3_BATCH_JOB_EXECUTION_START_TIME_IX on BOOT3_BATCH_JOB_EXECUTION(START_TIME);
    
    -- V11-dataflow.sql
    create index TASK_EXECUTION_PARENT_IX on TASK_EXECUTION(PARENT_EXECUTION_ID);
    create index BOOT3_TASK_EXECUTION_PARENT_IX on BOOT3_TASK_EXECUTION(PARENT_EXECUTION_ID);

    서버 구동

    서버는 Spring Cloud Dataflow, Skipper를 실행한다. 실행은 java -jar로 하며 java 버전은 어플리케이션를 빌드한 java 버전과 동일한 java 버전으로 실행해야 한다. 즉 어플리케이션을 java 21로 빌드 했다면 서버도 java 21로 실행해야 한다.

    그리고 실행 전에 메타테이블을 생성하였기에 자동 생성 방지를 위해 아래 옵션을 추가해서 실행한다.

    --spring.flyway.enabled=false

    Spring Cloud Dataflow Server

    java -jar spring-cloud-dataflow-server-2.11.4.jar \
    --spring.datasource.url=`DB 주소`\ 
    --spring.datasource.username=`DB 계정 ID`\ 
    --spring.datasource.password=`DB 계정 비밀번호`\ 
    --spring.datasource.driver-class-name=`DB Driver Class`\ 
    --spring.flyway.enabled=false

    해당 예제 글에선 Skipper 서버도 같은 곳에서 실행하기 때문에 별도 설정이 없지만 만약 Skipper 서버를 별도로 한다면 아래 옵션을 추가한다.

    --spring.cloud.skipper.client.serverUri=`Skipper주소/api`

    Spring Cloud Skipper Server

    java.exe -jar spring-cloud-skipper-server-2.11.4.jar \
    --spring.datasource.url=`DB 주소`\ 
    --spring.datasource.username=`DB 계정 ID`\ 
    --spring.datasource.password=`DB 계정 비밀번호`\ 
    --spring.datasource.driver-class-name=`DB Driver Class`\ 
    --spring.flyway.enabled=false

    예제 어플리케이션

    예제는 Spring Cloud Dataflow에서 제공하는 Batch 예제 어플리케이션를 참고하여 Spring Batch 5 전환 및 Gradle로 변경된 예제 어플리케이션을 사용한다. 해당 Git에서 받은 다음 Gradle로 빌드한다. 빌드된 jar 파일은 별도 경로에 위치 시켜두고 예제에 대해서 간략하게 설명 하자면 아래와 같다.

    • bill-setup-task
      • Spring Cloud Task 프로젝트며 예제 테이블을 생성한다.
    • bill-run
      • Spring Batch 5 + Task 프로젝트며 위 bill-setup-task에서 생성한 테이블에 데이터를 저장한다.

    2개의 프로젝트를 SCDF의 Applications에 등록 한 후 파이프라인(Tasks)를 생성해서 실행하는 예제이다.

    여기서 빌드된 파일명에 주의 해야 할 점은 SCDF에선 Version 관리를 위해 파일명에 하이픈(-)를 구분으로 버전명을 명시해줘야 한다. 예) bill-run-1.0.0.jar

    Application 등록

    Spring Cloud Dataflow Server와 Skipper 서버를 구동 시킨 후 http://localhost:9393/dashboard로 접속한다. 왼쪽 메뉴 중 Applications를 클릭 한 후 컨텐츠 상단에 ADD APPLICATION(S) 버튼을 클릭하면 아래 이미지와 같은 화면이 보인다.

    https://i.ibb.co/5FWmvXj/3.png

    3개의 메뉴를 통해 Application을 등록할 수 있는데 해당 예제에선 Register one or more applications를 사용해서 빌드된 jar 파일을 등록하도록 한다. 마지막 Import application starters from dataflow.spring.io. 메뉴는 Spring에서 제공하는 각종 starters를 등록할 수 있는 메뉴이다.

    Register one or more applications를 펼쳐보면 Application를 등록하는 화면이 나타나는데 입력은 아래와 같이 한다.

    • Name → 생성할 어플리케이션 이름 입력
    • Type → task 선택
    • Spring Boot version → Spring Boot 3.x 선택
    • URI → 빌드된 jar 파일의 경로를 입력하는데 이때 프로토콜을 지정해줘야 하며 프로토콜은 file:로 해서 빌드된 jar 파일의 절대경로로 입력한다. 예) file:/home/sinnake/bill-setup-task-1.0.0.jar

    URI 같은 경우엔 설명에도 나와있듯이 maven/docker/file등으로 할 수 있으며 각 종류에 맞게 프로토콜를 필히 같이 입력해야 한다.

    • maven → maven:
    • docker → docker:
    • file → file:

    만약 등록된 어플리케이션이 새로운 버전으로 빌드 되어 등록해야 한다면 Name을 기 등록된 어플리케이션의 이름과 동일하게 하고 파일명에 버전을 명시한 후 등록하면 아래와 같이 버전을 선택할 수 있게 바뀌게 된다. 이때 파일명엔 버전을 -(하이픈)으로 구분해줘야 한다.

    예) bill-setup-task-버전명시.jar → bill-setup-task-1.0.2.jar

    https://i.ibb.co/gT3RyyN/2.png

    이후 Tasks에서 추가된 버전을 사용하기 위해선 Applications에서 추가된 버전의 Task 명을 클릭한 다음 MANAGE VERSIONS를 통해 기본 값으로 설정하거나 Tasks에서 LAUNCH TASK로 실행할 때 버전을 선택해서 실행하면 된다.

    REST API

    만약 매번 화면에서 하기 어려우면 REST API 호출로 간편하게 실행할 수 있다. 관련 REST API는 참고.

    • 어플리케이션 추가 API

    /app// URI 형식인 POST 방식으로 Spring Boot Version 설정은 쿼리스트링에 bootVersion(2 혹은 3)으로 지정하면 되고 uri는 값으로 설정한다.

    curl 'http://localhost:9393/apps/task/bill-setup-task?bootVersion=3&force=true' \
    -i -X POST -d \
    'uri=file:/home/sinnake/scdf/bill-setup-task-1.0.1.jar'

    위 예제는 file 프로토콜로 bill-setup-task jar를 사용하고 어플리케이션 이름은 bill-setup-task로 지정하고 type은 task인 어플리케이션을 추가하는 호출이다. 쿼리스트링 중 force 값이 있는데 같은 버전을 추가 했을 경우 에러가 발생하는데 해당 값을 true로 하게 되면 에러가 발생하지 않는다.

    새롭게 빌드 된 버전의 어플리케이션을 추가할 경우도 동일하며 uri에 신규 버전인 어플리케이션을 지정하면 된다.

    최종적으로 위 화면과 동일한 어플리케이션이 등록되고 관련 REST API는 참고

    • 어플리케이션 기본 버전 지정 API

    /apps///형식인 PUT방식으로 type, name은 위 추가 API와 동일하며 version은 등록된 어플리케이션 중 사용하려는 version를 지정하면 된다.

    관련 REST API는 참고

    curl 'http://localhost:9393/apps/task/bill-setup-task/1.0.1' \
    -i -X PUT -H \
    'Accept: application/json'

    Task 생성

    실제 어플리케이션에 대한 파이프라인을 생성하는 부분이며 Tasks / Jobs 메뉴의 Tasks를 클릭한 후 컨텐츠 페이지의 최상단에 CREATE TASK 버튼을 클릭한다.

    https://i.ibb.co/Cnxkx2r/3.png

    왼쪽 영역에는 type이 task로 지정된 어플리케이션들의 목록이 나오며 드래그앤드랍을 활용해서 오른쪽 화면 영역에 파이프라인을 지정한다.

    이미지의 예제는 Spring Cloud Task인 bill-setup-task를 실행 한 후 Spring Batch 5인 bill-run을 실행하는 파이프라인이다.

    Task 생성 역시 REST API로 할 수 있다. 단 데이터로 이미지와 같은 흐름을 하기에는 복잡하니 화면을 이용하는 게 좋은 방법 일수도 있다.

    Task 실행

    Builder

    Task가 정상적으로 등록되었다면 아래 이미지와 같이 등록된 Task 항목이 보여진다. 항목이 1개만 생길 것이라 생각했는데 실상은 총 3개가 생긴다. 직접 생성한 Task와 이 Task에 사용되는 Task 항목까지 총 3개가 생기며 분류는 Definition 열을 참고하면 직접 생성한 Task를 확인할 수 있다.

    https://i.ibb.co/Yydqs4v/2024-09-03-105335.png

    Name 열에 직접 생성한 Task 명을 클릭한 후 전환된 페이지의 상단에 LAUNCH TASK 버튼을 클릭한다. 클릭하면 아래 이미지와 같은 화면이 출력 된다.

    https://i.ibb.co/2gM363g/2024-09-03-130939.png

    위 옵션 중 일반적으로 가장 많이 쓸 옵션이 Applications Properties와 Arguments 옵션이지 않을까 한다. 그 외 옵션은 인프라와 관련된 옵션들이 대부분이다. Deployment Platform 옵션 종류는 EDIT 버튼을 클릭하면 위 Platform에서 지정한 플랫폼과 관련된 옵션들의 목록을 확인할 수 있다.

    옵션은 전역과 각 Task마다 설정할 수 있으며 전역으로 설정하고 싶으면 왼쪽에 컬럼명이 없는 열과 Global 열에 설정하면 된다.

    해당 옵션들은 Task를 매번 실행시킬 때 마다 입력할 필요는 없다. 최초 1회만 입력하면 DB에 저장되기 때문에 이 후엔 입력이 필요 없이 실행할 수 있다. 단 Arguments 경우엔 저장하지 않기 때문에 매번 입력해야 한다.

    예제의 bill-run Task는 실행시킬 Job를 지정해야 하기 때문에 Applications Properties에 설정하였다. 위 이미지와 같이 하면 SCDF에선 프로퍼티 키에 값이 있는 Task에서만 해당 값을 설정하고 값이 없는 Task엔 설정하지 않는다.

    설정이 완료 되었으면 하단에 LAUNCH TASK 버튼을 클릭하면 파이프라인이 실행된다.

    Freetext

    Freetext 메뉴에선 화면의 Input이 아닌 프로퍼티를 이용해서 Task 실행 설정이 가능하다.

    https://i.ibb.co/rtByQxg/2024-09-03-132223.png

    위 이미지의 설정은 Builder에서 한 설정과 동일하고 다른 점이라면 Generic Deployer Properties를 추가 설정한 프로퍼티이다.

    텍스트 박스는 2개로 나뉘며 첫 번째 텍스트 박스는 Builder 기준으로 Arguments 옵션을 제외한 모든 옵션을 설정하는 영역이고 두 번째 텍스트 박스는 Arguments 옵션만 설정하는 영역이다.

    프로퍼티 규칙으로는 프로퍼티의 첫 번째 문자열은

    • app → Applications Properties
    • deployer → Generic Deployer Properties, Deployment Platform
      • Platform, Ctr Properties도 포함 되는지는 확인 못함.

    를 뜻하고 첫 번째 마침표 다음으로 적용할 Task 이름을 명시한다 여기서 *은 전역을 뜻하고 두 번째 마침표 이후 부터가 실제 Task에 적용될 프로퍼티명이다.

    만약 작성하기 어려우면 Builder로 먼저 Task를 실행 한 후 다시 접속해서 Freetext를 확인해보면 자동 채움 되어 있으니 작성 시 참고하도록 한다.

    Arguments 영역은 deployer의 사용 여부까진 확인하진 못했지만 deployer가 아닌 app를 동일한 규칙으로 사용하면 된다. 단 Arguments의 값은 커맨드 라인 매개변수 형태로 지정해야 한다.

    예) app.bill-run.jobName=--spring.batch.job.name=BillJob

    설정이 완료 되었으면 하단에 LAUNCH TASK 버튼을 클릭하면 파이프라인이 실행된다.

    REST API

    Task 실행 역시 Rest API로 가능하다. 관련 REST API는 참고

    /tasks/executions/launch uri에 POST 방식으로 호출하면 되고 Request parameter는

    • name → 실행 시킬 Task definition 명
    • properties → Freetext의 첫 번째 텍스트 박스에 사용되는 프로퍼티
    • arguments → Freetext의 두 번째 텍스트 박스에 사용되는 프로퍼티

    각 Request parameter 구분은 &로 한다.

    properties 설정 예

    각 설정은 ,(쉼표)로 구분 한다.

    param=$(
    echo -n 'name=bill' && \
    echo -n '&properties=' && \
    echo -n 'deployer.*.cpu=1' && \
    echo -n ',deployer.*.memory=256' && \
    echo -n ',deployer.*.disk=300' && \
    echo -n ',app.bill-run.spring.batch.job.name=BillJob'
    )
    
    curl 'http://localhost:9393/tasks/executions/launch' -i -X POST \
            -d $param

    arguments 설정 예

    arguments는 properties와 다르게 %20(공백)으로 구분 한다.

    args_param=$(
    echo -n 'name=bill' && \
    echo -n 'arguments=' && \
    echo -n 'app.bill-run.jobName=--spring.batch.job.name=BillJob' && \
    echo -n '%20app.bill-run.gogo=--spring.gogo=house'
    )
    
    curl 'http://localhost:9393/tasks/executions/launch' -i -X POST \
            -d $args_param

    이슈

    2.11.x 이상 버전의 mysql용 메타 테이블 생성 쿼리

    mysql에는 datetime2 타입이 없는데 해당 타입으로 생성하는 쿼리가 존재. 메타 테이블 정보는 참고

    문제의 sql 파일은

    • V1-dataflow.sql
      • TASK_LOCK
    • V7-dataflow.sql
      • BOOT3_TASK_EXECUTION
      • BOOT3_TASK_LOCK
      • BOOT3_BATCH_JOB_EXECUTION
      • BOOT3_BATCH_STEP_EXECUTION

    init-mysql 프로파일 실행 문제

    초기에 SCDF를 실행하면 메타 테이블들을 생성하는데 이때 자동이 아닌 수동으로 생성하고 싶으면 README.MD를 참고한다.

    사용하는 방법은 SCDF를 실행할 때 spring.profiles.active 옵션을 아래 중 하나로 설정하고 실행하면 된다.

    • init-db2
    • init-mariadb
    • init-mysql
    • init-oracle
    • init-postresql
    • init-sqlserver

    여기서 mysql 메타 테이블을 생성하는 init-mysql에 문제가 있다. init-mysql의 설정파일인 application-init-mysql.yml 내용을 보면 아래와 같다.

    spring:
      flyway:
        enabled: false
      sql:
        init:
          mode: always
          schema-locations: 
            - classpath*:/schemas/mysql/V1-dataflow.sql
            - classpath*:/schemas/mysql/V1-skipper.sql
            - classpath*:/schemas/mysql/V2-dataflow.sql
            - classpath*:/schemas/mysql/V2-dataflow-after.sql
            - classpath*:/schemas/mysql/V3-dataflow.sql
            - classpath*:/schemas/mysql/V4-dataflow.sql
            - classpath*:/schemas/mysql/V5-dataflow.sql
            - classpath*:/schemas/mysql/V6-dataflow.sql
            - classpath*:/schemas/mysql/V7-dataflow.sql
            - classpath*:/schemas/mysql/V9-dataflow.sql
            - classpath*:/schemas/mysql/V8-dataflow.sql
            - classpath*:/schemas/mysql/V9-dataflow.sql
            - classpath*:/schemas/mysql/V10-dataflow.sql

    문제는

    - classpath*:/schemas/mysql/V9-dataflow.sql # 중복호출
    - classpath*:/schemas/mysql/V8-dataflow.sql
    - classpath*:/schemas/mysql/V9-dataflow.sql # 중복호출
    - classpath*:/schemas/mysql/V10-dataflow.sql

    V9가 2번 호출 되면서 View 생성 에러가 발생한다.

    spring:
      flyway:
        enabled: false
      sql:
        init:
          mode: always
          schema-locations:
            - classpath*:/schemas/mariadb/V1-dataflow.sql
            - classpath*:/schemas/mariadb/V1-skipper.sql
            - classpath*:/schemas/mariadb/V2-dataflow.sql
            - classpath*:/schemas/mariadb/V2-dataflow-after.sql
            - classpath*:/schemas/mariadb/V3-dataflow.sql
            - classpath*:/schemas/mariadb/V4-dataflow.sql
            - classpath*:/schemas/mariadb/V5-dataflow.sql
            - classpath*:/schemas/mariadb/V6-dataflow.sql
            - classpath*:/schemas/mariadb/V7-dataflow.sql
            - classpath*:/schemas/mariadb/V9-dataflow.sql
            - classpath*:/schemas/mariadb/V8-dataflow.sql

    그래서 이와 유사한 application-init-mariadb.yml를 보면 위와 같이 설정되어 있고 아래와 같은 순서로 sql 스크립트를 실행하고 있다.

        - classpath*:/schemas/mariadb/V7-dataflow.sql
        - classpath*:/schemas/mariadb/V9-dataflow.sql
        - classpath*:/schemas/mariadb/V8-dataflow.sql

    그러면 init-mysql에서는 V9를 왜 중복 호출하는지에 대해 유추 해보자면 mysql에서는 V11 sql 파일이 존재하는데 설정에는 누락 되어 있는것으로 보아 오타이지 않을까 한다. 만약 사실이라면 아래와 같은 형태가 되어야 하지 않을까 한다.

    spring:
      flyway:
        enabled: false
      sql:
        init:
          mode: always
          schema-locations: 
            - classpath*:/schemas/mysql/V1-dataflow.sql
            - classpath*:/schemas/mysql/V1-skipper.sql
            - classpath*:/schemas/mysql/V2-dataflow.sql
            - classpath*:/schemas/mysql/V2-dataflow-after.sql
            - classpath*:/schemas/mysql/V3-dataflow.sql
            - classpath*:/schemas/mysql/V4-dataflow.sql
            - classpath*:/schemas/mysql/V5-dataflow.sql
            - classpath*:/schemas/mysql/V6-dataflow.sql
            - classpath*:/schemas/mysql/V7-dataflow.sql
            - classpath*:/schemas/mysql/V9-dataflow.sql
            - classpath*:/schemas/mysql/V8-dataflow.sql
            - classpath*:/schemas/mysql/V10-dataflow.sql # V9 중복 호출을 V10으로 변경
            - classpath*:/schemas/mysql/V11-dataflow.sql # V11 신규 추가

    아쉽게도 해당 문제를 해결하기 위해선 Spring에서 수정해줘야 하는 상황이며 그전까지는 mysql schemas에 있는 sql 파일을 위 정답이라 생각되는 설정 파일에 순서에 맞춰 실행해줘야 한다. 그러나 위 해당 스키마 파일로 실행하게 되면 2.11.x 이상 버전의 mysql용 메타 테이블 생성 쿼리 이슈가 있다.

    Spring Batch와 SCDF 메타테이블 이슈

    SCDF에선 Spring Batch 메타테이블을 Spring boot 2.x와 Spring boot 3.x가 구분된 각 버전별 메타테이블을 별도로 생성해야 한다.

    이때 Spring Batch 5를 먼저 사용 중에 있다면 테이블명이 충돌 되기 때문에 SCDF 기준으로 기존 Spring Batch 5 메타테이블 prefix를 변경 하던가 아예 새로 만들어야 한다. 만약 변경하지 않고 Spring Batch를 단일 실행 하게 될 때 SCDF의 Spring boot 2.x 메타테이블을 사용할 수 있어BATCH_JOB_EXECUTION_PARAMS 테이블에 저장할 때 에러가 발생된다. (Spring Batch가 버전업 되면서 스키마도 변경 됨.)

    만약 기존 Spring Batch 5가 아닌 SCDF 메타테이블을 사용한다면 기존 Spring Batch 5에 아래와 같이 옵션을 지정하면 된다.

    spring:
      batch:
        jdbc:
          table-prefix: BOOT3_BATCH_

    그래도 사소한 문제가 있는게 SCDF 메타테이블을 같이 사용하기 때문에 SCDF로의 배치 실행이 아닌 단일 실행 시에도 SCDF 대시보드의 Task executions 메뉴 리스트에 단일 실행된 결과가 노출된다.

    특이사항

    • Spring boot 3 지원은 2.11.x 부터 지원 시작.

    • 2.11.x 이상부터는 Spring boot 3 지원 때문인지 기존 메타 테이블외 boot3 메타 테이블을 별도 생성해야 함.

      • SCDF & TASK 메타 테이블인 경우엔 BOOT3_TASK_

      • Batch 메타 테이블인 경우엔 BOOT3_BATCH_

        어플리케이션을 등록할 때 Spring Boot version 부분을 3.x로 설정하게 되면 테이블 명에 BOOT3이 있는 메타 테이블을 사용하고 2.x로 설정하면 기존 메타 테이블을 사용. 해당 부분은 Spring에서 강제하기에 별도 설정은 없는것으로 보임.

    반응형

    'Spring' 카테고리의 다른 글

    Spring & JTA(분산 트랜잭션)  (1) 2023.10.30
    Spring과 Builder Pattern  (0) 2023.04.05
    Spring과 Abstract Factory Pattern  (0) 2023.02.01
    Spring과 Factory Pattern  (1) 2023.01.19
    Spring과 Factory Method Pattern  (0) 2023.01.18

    댓글

Designed by Tistory.