Python 스케줄링 완벽 가이드: schedule vs APScheduler 비교 분석

“Python으로 자동화 작업을 일정에 맞춰 실행하고 싶은데, 어떤 라이브러리를 선택해야 할까요?”

매일 특정 시간에 리포트를 생성하거나, 주기적으로 데이터를 수집하거나, 정해진 시간에 알림을 보내는 등의 작업을 자동화하려면 스케줄링이 필수입니다. 이 글에서는 Python의 대표적인 스케줄링 라이브러리인 schedule과 APScheduler를 깊이 있게 비교하고, 실전 예제로 활용법을 알려드립니다.

Table of Contents

스케줄링이 필요한 순간들

실무에서 자주 쓰이는 사례

  • 데이터 수집: 매일 새벽 3시에 웹 크롤링 실행
  • 리포트 생성: 매주 월요일 오전 9시에 주간 보고서 자동 생성
  • 알림 발송: 매일 출근 시간에 텔레그램/슬랙 메시지 전송
  • 백업 작업: 매일 자정에 데이터베이스 백업
  • 모니터링: 5분마다 서버 상태 체크

Schedule vs APScheduler 한눈에 비교

특징scheduleAPScheduler
난이도초보자 친화적중급 이상 권장
코드 가독성매우 직관적설정 기반
기능 범위기본 스케줄링고급 기능 다수
동시 실행어려움쉬움 (멀티스레드)
작업 관리제한적동적 추가/제거
Cron 표현식미지원지원
적합한 프로젝트간단한 자동화복잡한 배치 시스템

Schedule: 직관적이고 간단한 스케줄링

설치 방법

pip install schedule

기본 사용법

schedule의 가장 큰 장점은 영어 문장처럼 읽히는 직관적인 문법입니다.

import schedule
import time

def job():
    print("작업 실행 중...")
    print(f"실행 시간: {time.strftime('%Y-%m-%d %H:%M:%S')}")

# 10초마다 실행
schedule.every(10).seconds.do(job)

# 10분마다 실행
schedule.every(10).minutes.do(job)

# 매 시간 실행
schedule.every().hour.do(job)

# 매일 오전 10시 30분에 실행
schedule.every().day.at("10:30").do(job)

# 매주 월요일 실행
schedule.every().monday.do(job)

# 매주 수요일 오후 1시 15분에 실행
schedule.every().wednesday.at("13:15").do(job)

# 스케줄 실행
while True:
    schedule.run_pending()
    time.sleep(1)

파라미터 전달하기

함수에 인자를 전달할 수 있습니다.

def send_notification(name, message):
    print(f"{name}님께 메시지 전송: {message}")

# 파라미터와 함께 스케줄 등록
schedule.every().day.at("09:00").do(
    send_notification,
    name="홍길동",
    message="좋은 아침입니다!"
)

while True:
    schedule.run_pending()
    time.sleep(1)

작업 취소하기

등록된 스케줄을 취소하는 방법입니다.

import schedule

def morning_job():
    print("아침 작업 실행")
    return schedule.CancelJob  # 이 작업을 한 번만 실행

# 매일 실행되지만 첫 실행 후 자동 취소
schedule.every().day.at("06:00").do(morning_job)

# 또는 명시적으로 취소
job = schedule.every().hour.do(print, "매 시간 실행")
schedule.cancel_job(job)  # 작업 취소

특정 시간까지만 실행하기

from datetime import datetime, timedelta

def limited_job():
    print("제한된 시간 동안만 실행되는 작업")

# 오늘 자정까지만 실행
schedule.every(10).minutes.until("23:59").do(limited_job)

# 특정 날짜까지만 실행
end_date = datetime.now() + timedelta(days=7)
schedule.every().day.until(end_date).do(limited_job)

while True:
    schedule.run_pending()
    time.sleep(1)

태그를 활용한 작업 그룹 관리

import schedule

def morning_task():
    print("아침 작업")

def evening_task():
    print("저녁 작업")

def urgent_task():
    print("긴급 작업")

# 태그 설정
schedule.every().day.at("08:00").do(morning_task).tag('morning', 'daily')
schedule.every().day.at("18:00").do(evening_task).tag('evening', 'daily')
schedule.every(5).minutes.do(urgent_task).tag('urgent')

# 특정 태그의 작업만 취소
schedule.clear('morning')  # 아침 작업만 취소
schedule.clear('daily')    # 일일 작업 모두 취소

while True:
    schedule.run_pending()
    time.sleep(1)

실전 예제: 매일 리포트 생성

import schedule
import time
from datetime import datetime
import pandas as pd

def generate_daily_report():
    """일일 리포트 생성"""
    print(f"[{datetime.now()}] 리포트 생성 시작")

    # 데이터 수집 (예시)
    data = {
        '날짜': [datetime.now().strftime('%Y-%m-%d')],
        '방문자': [1234],
        '매출': [567890]
    }

    df = pd.DataFrame(data)
    filename = f"report_{datetime.now().strftime('%Y%m%d')}.csv"
    df.to_csv(filename, index=False, encoding='utf-8-sig')

    print(f"리포트 저장 완료: {filename}")

# 매일 오전 9시에 리포트 생성
schedule.every().day.at("09:00").do(generate_daily_report)

# 주말에는 실행하지 않기
def is_weekday():
    return datetime.now().weekday() < 5

def weekday_report():
    if is_weekday():
        generate_daily_report()

schedule.every().day.at("09:00").do(weekday_report)

print("스케줄러 시작...")
while True:
    schedule.run_pending()
    time.sleep(60)  # 1분마다 체크

APScheduler: 강력하고 유연한 고급 스케줄링

설치 방법

pip install apscheduler

스케줄러 종류 이해하기

APScheduler는 용도에 따라 다른 스케줄러를 제공합니다.

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler

# BlockingScheduler: 단일 작업, 메인 스레드 차단
# 순수 스케줄링 전용 스크립트에 사용
blocking_sched = BlockingScheduler()

# BackgroundScheduler: 다중 작업, 백그라운드 실행
# 웹 애플리케이션이나 다른 작업과 병행 시 사용
background_sched = BackgroundScheduler()

3가지 트리거 방식

APScheduler는 세 가지 스케줄링 방식을 지원합니다.

1. Interval 트리거 (일정 간격 반복)

from apscheduler.schedulers.background import BackgroundScheduler
import time

def interval_job():
    print(f"[{time.strftime('%H:%M:%S')}] Interval 작업 실행")

sched = BackgroundScheduler()
sched.start()

# 3초마다 실행
sched.add_job(interval_job, 'interval', seconds=3, id='interval_job')

# 10분마다 실행
sched.add_job(interval_job, 'interval', minutes=10, id='minute_job')

# 1시간마다 실행
sched.add_job(interval_job, 'interval', hours=1, id='hour_job')

# 메인 프로그램 계속 실행
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    sched.shutdown()

2. Cron 트리거 (특정 시간 실행)

Cron 표현식으로 정교한 스케줄링이 가능합니다.

from apscheduler.schedulers.background import BackgroundScheduler
import time

def cron_job():
    print(f"[{time.strftime('%H:%M:%S')}] Cron 작업 실행")

sched = BackgroundScheduler()
sched.start()

# 매 5초마다 실행 (*/5는 5의 배수 초에 실행)
sched.add_job(cron_job, 'cron', second='*/5', id='every_5_sec')

# 매일 오전 9시에 실행
sched.add_job(cron_job, 'cron', hour=9, minute=0, id='daily_9am')

# 평일 오전 9시에만 실행
sched.add_job(
    cron_job,
    'cron',
    day_of_week='mon-fri',  # 월~금
    hour=9,
    minute=0,
    id='weekday_morning'
)

# 매월 1일 오전 10시에 실행
sched.add_job(
    cron_job,
    'cron',
    day=1,  # 1일
    hour=10,
    minute=0,
    id='monthly_report'
)

# 매시간 정각과 30분에 실행
sched.add_job(
    cron_job,
    'cron',
    minute='0,30',  # 0분과 30분
    id='twice_hourly'
)

try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    sched.shutdown()

3. Date 트리거 (일회성 실행)

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta

def one_time_job():
    print("일회성 작업 실행")

sched = BackgroundScheduler()
sched.start()

# 특정 시간에 한 번만 실행
run_date = datetime.now() + timedelta(seconds=10)
sched.add_job(
    one_time_job,
    'date',
    run_date=run_date,
    id='one_time'
)

# 2025년 12월 31일 23시 59분에 실행
sched.add_job(
    one_time_job,
    'date',
    run_date=datetime(2025, 12, 31, 23, 59, 0),
    id='new_year_countdown'
)

파라미터 전달하기

from apscheduler.schedulers.background import BackgroundScheduler

def job_with_params(name, age, city="서울"):
    print(f"이름: {name}, 나이: {age}, 도시: {city}")

sched = BackgroundScheduler()
sched.start()

# args로 위치 인자 전달 (리스트나 튜플)
sched.add_job(
    job_with_params,
    'interval',
    seconds=5,
    args=["홍길동", 30],  # 반드시 리스트/튜플
    id='param_job_1'
)

# kwargs로 키워드 인자 전달 (딕셔너리)
sched.add_job(
    job_with_params,
    'interval',
    seconds=5,
    kwargs={"name": "김철수", "age": 25, "city": "부산"},
    id='param_job_2'
)

# args와 kwargs 혼합
sched.add_job(
    job_with_params,
    'interval',
    seconds=5,
    args=["이영희", 28],
    kwargs={"city": "대구"},
    id='param_job_3'
)

작업 동적 관리하기

APScheduler의 강력한 기능 중 하나는 실행 중에 작업을 추가/제거/수정할 수 있다는 점입니다.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.base import JobLookupError
import time

def job():
    print(f"[{time.strftime('%H:%M:%S')}] 작업 실행")

sched = BackgroundScheduler()
sched.start()

# 작업 추가
sched.add_job(job, 'interval', seconds=3, id='removable_job')

count = 0
while True:
    print("메인 프로세스 실행 중...")
    time.sleep(1)
    count += 1

    # 10초 후 작업 제거
    if count == 10:
        try:
            sched.remove_job('removable_job')
            print("작업 제거 완료!")
        except JobLookupError:
            print("작업을 찾을 수 없습니다")
        break

sched.shutdown()

작업 일시 중지 및 재개

from apscheduler.schedulers.background import BackgroundScheduler

def pausable_job():
    print("일시 중지 가능한 작업")

sched = BackgroundScheduler()
sched.start()

sched.add_job(pausable_job, 'interval', seconds=2, id='pause_test')

# 5초 후 일시 중지
time.sleep(5)
sched.pause_job('pause_test')
print("작업 일시 중지")

# 5초 후 재개
time.sleep(5)
sched.resume_job('pause_test')
print("작업 재개")

작업 수정하기

from apscheduler.schedulers.background import BackgroundScheduler

def modifiable_job():
    print("수정 가능한 작업")

sched = BackgroundScheduler()
sched.start()

# 처음에는 5초마다
sched.add_job(modifiable_job, 'interval', seconds=5, id='mod_job')

# 10초 후 간격을 2초로 변경
time.sleep(10)
sched.reschedule_job('mod_job', trigger='interval', seconds=2)
print("작업 간격 변경: 5초 → 2초")

실전 예제 1: 텔레그램 봇 자동 알림

from apscheduler.schedulers.background import BackgroundScheduler
import telegram
from datetime import datetime

# 텔레그램 봇 설정
TOKEN = 'your_bot_token'
CHAT_ID = 'your_chat_id'
bot = telegram.Bot(token=TOKEN)

def send_morning_message():
    """아침 인사 메시지"""
    message = f"좋은 아침입니다! 😊\\n오늘은 {datetime.now().strftime('%Y년 %m월 %d일')}입니다."
    bot.send_message(chat_id=CHAT_ID, text=message)
    print(f"[{datetime.now()}] 아침 메시지 전송 완료")

def send_lunch_reminder():
    """점심 알림 메시지"""
    message = "점심 시간입니다! 🍽️"
    bot.send_message(chat_id=CHAT_ID, text=message)
    print(f"[{datetime.now()}] 점심 알림 전송 완료")

def send_evening_message():
    """저녁 인사 메시지"""
    message = "수고하셨습니다! 좋은 저녁 되세요 🌙"
    bot.send_message(chat_id=CHAT_ID, text=message)
    print(f"[{datetime.now()}] 저녁 메시지 전송 완료")

# 스케줄러 설정
sched = BackgroundScheduler(timezone='Asia/Seoul')
sched.start()

# 평일 오전 8시 - 아침 인사
sched.add_job(
    send_morning_message,
    'cron',
    day_of_week='mon-fri',
    hour=8,
    minute=0,
    id='morning_msg'
)

# 평일 낮 12시 - 점심 알림
sched.add_job(
    send_lunch_reminder,
    'cron',
    day_of_week='mon-fri',
    hour=12,
    minute=0,
    id='lunch_reminder'
)

# 평일 오후 6시 - 퇴근 인사
sched.add_job(
    send_evening_message,
    'cron',
    day_of_week='mon-fri',
    hour=18,
    minute=0,
    id='evening_msg'
)

print("텔레그램 봇 스케줄러 시작...")

try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    sched.shutdown()
    print("스케줄러 종료")

실전 예제 2: 웹 크롤링 자동화

from apscheduler.schedulers.blocking import BlockingScheduler
import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime

def crawl_news():
    """뉴스 크롤링 작업"""
    print(f"[{datetime.now()}] 크롤링 시작...")

    try:
        # 예시: 간단한 크롤링
        url = "<https://news.example.com>"
        response = requests.get(url, timeout=10)
        soup = BeautifulSoup(response.text, 'html.parser')

        # 데이터 추출 (예시)
        titles = [title.text for title in soup.select('.news-title')[:10]]

        # 데이터프레임 생성 및 저장
        df = pd.DataFrame({
            '수집시간': [datetime.now()] * len(titles),
            '제목': titles
        })

        filename = f"news_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
        df.to_csv(filename, index=False, encoding='utf-8-sig')

        print(f"크롤링 완료: {len(titles)}개 기사 수집")
        print(f"파일 저장: {filename}")

    except Exception as e:
        print(f"크롤링 오류: {str(e)}")

# BlockingScheduler 사용 (단일 작업 전용)
sched = BlockingScheduler(timezone='Asia/Seoul')

# 매일 오전 6시, 정오, 오후 6시에 크롤링
sched.add_job(crawl_news, 'cron', hour='6,12,18', minute=0, id='news_crawler')

# 또는 4시간마다 크롤링
# sched.add_job(crawl_news, 'interval', hours=4, id='news_crawler')

print("뉴스 크롤러 시작...")
try:
    sched.start()
except (KeyboardInterrupt, SystemExit):
    pass

실전 예제 3: 데이터베이스 백업

from apscheduler.schedulers.background import BackgroundScheduler
import sqlite3
import shutil
from datetime import datetime
import os

def backup_database():
    """데이터베이스 백업"""
    print(f"[{datetime.now()}] 백업 시작...")

    try:
        # 백업 폴더 생성
        backup_dir = 'backups'
        if not os.path.exists(backup_dir):
            os.makedirs(backup_dir)

        # 원본 DB 파일
        source_db = 'mydata.db'

        # 백업 파일명 (타임스탬프 포함)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"{backup_dir}/backup_{timestamp}.db"

        # 파일 복사
        shutil.copy2(source_db, backup_file)

        # 백업 파일 크기 확인
        size_mb = os.path.getsize(backup_file) / (1024 * 1024)

        print(f"백업 완료: {backup_file}")
        print(f"파일 크기: {size_mb:.2f} MB")

        # 오래된 백업 파일 삭제 (30일 이상)
        cleanup_old_backups(backup_dir, days=30)

    except Exception as e:
        print(f"백업 오류: {str(e)}")

def cleanup_old_backups(backup_dir, days=30):
    """오래된 백업 파일 삭제"""
    from datetime import timedelta

    cutoff_time = datetime.now() - timedelta(days=days)

    for filename in os.listdir(backup_dir):
        filepath = os.path.join(backup_dir, filename)
        file_time = datetime.fromtimestamp(os.path.getmtime(filepath))

        if file_time < cutoff_time:
            os.remove(filepath)
            print(f"오래된 백업 삭제: {filename}")

# 스케줄러 설정
sched = BackgroundScheduler(timezone='Asia/Seoul')
sched.start()

# 매일 새벽 3시에 백업
sched.add_job(
    backup_database,
    'cron',
    hour=3,
    minute=0,
    id='daily_backup'
)

# 또는 6시간마다 백업
# sched.add_job(backup_database, 'interval', hours=6, id='periodic_backup')

print("데이터베이스 백업 스케줄러 시작...")

고급 기능 활용하기

작업 실행 이력 및 모니터링

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def monitored_job():
    """모니터링되는 작업"""
    print("작업 실행 중...")
    # 일부러 오류 발생시키기 (테스트용)
    # raise Exception("테스트 오류")

def job_listener(event):
    """작업 이벤트 리스너"""
    if event.exception:
        logger.error(f"작업 실패: {event.job_id} - {event.exception}")
    else:
        logger.info(f"작업 성공: {event.job_id}")

# 스케줄러 설정
sched = BackgroundScheduler()

# 이벤트 리스너 등록
sched.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

sched.start()
sched.add_job(monitored_job, 'interval', seconds=10, id='monitored')

try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    sched.shutdown()

Job Store 활용 (영구 저장)

기본적으로 APScheduler는 메모리에 작업을 저장하지만, 데이터베이스에 저장하여 재시작 후에도 유지할 수 있습니다.

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# Job Store 설정 (SQLite 사용)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

sched = BackgroundScheduler(jobstores=jobstores)
sched.start()

def persistent_job():
    print("영구 저장된 작업 실행")

# replace_existing=True로 중복 방지
sched.add_job(
    persistent_job,
    'interval',
    minutes=10,
    id='persistent_job',
    replace_existing=True  # 기존 작업이 있으면 교체
)

# 프로그램을 재시작해도 작업이 유지됩니다

여러 스케줄러 동시 사용

from apscheduler.schedulers.background import BackgroundScheduler

def priority_job():
    print("높은 우선순위 작업")

def normal_job():
    print("일반 작업")

# 우선순위별 스케줄러 분리
priority_sched = BackgroundScheduler()
normal_sched = BackgroundScheduler()

priority_sched.start()
normal_sched.start()

# 각각 다른 작업 등록
priority_sched.add_job(priority_job, 'interval', seconds=5)
normal_sched.add_job(normal_job, 'interval', seconds=10)

에러 처리 및 안정성

재시도 로직 구현

from apscheduler.schedulers.background import BackgroundScheduler
from functools import wraps
import time

def retry(max_attempts=3, delay=2):
    """재시도 데코레이터"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"모든 시도 실패: {str(e)}")
                        raise
                    print(f"시도 {attempt + 1} 실패. {delay}초 후 재시도...")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2)
def unstable_job():
    """불안정한 작업 (예: 네트워크 요청)"""
    import random
    if random.random() < 0.5:  # 50% 확률로 실패
        raise Exception("작업 실패")
    print("작업 성공!")

sched = BackgroundScheduler()
sched.start()
sched.add_job(unstable_job, 'interval', seconds=10)

타임아웃 설정

from apscheduler.schedulers.background import BackgroundScheduler
import signal
from contextlib import contextmanager

class TimeoutException(Exception):
    pass

@contextmanager
def time_limit(seconds):
    """타임아웃 컨텍스트 매니저"""
    def signal_handler(signum, frame):
        raise TimeoutException("작업 시간 초과")

    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)

def long_running_job():
    """오래 걸리는 작업"""
    try:
        with time_limit(30):  # 30초 제한
            # 실제 작업 수행
            import time
            time.sleep(10)
            print("작업 완료")
    except TimeoutException:
        print("작업이 30초를 초과하여 강제 종료되었습니다")

sched = BackgroundScheduler()
sched.start()
sched.add_job(long_running_job, 'interval', minutes=5)

실무 활용 패턴

패턴 1: 작업 체이닝 (순차 실행)

from apscheduler.schedulers.background import BackgroundScheduler
import time

def step1():
    print("1단계: 데이터 수집")
    time.sleep(2)
    return True

def step2():
    print("2단계: 데이터 처리")
    time.sleep(2)
    return True

def step3():
    print("3단계: 결과 저장")
    time.sleep(1)
    return True

def chained_job():
    """체인 작업 실행"""
    print("=== 작업 시작 ===")

    if step1():
        if step2():
            step3()

    print("=== 작업 완료 ===")

sched = BackgroundScheduler()
sched.start()
sched.add_job(chained_job, 'cron', hour=3, minute=0)

패턴 2: 조건부 실행

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime

def should_run_today():
    """오늘 작업을 실행해야 하는지 확인"""
    # 예: 공휴일이 아닌 평일에만 실행
    today = datetime.now()

    # 주말 체크
    if today.weekday() >= 5:
        return False

    # 공휴일 리스트 (예시)
    holidays = [
        datetime(2025, 1, 1),   # 신정
        datetime(2025, 3, 1),   # 삼일절
        datetime(2025, 5, 5),   # 어린이날
        datetime(2025, 12, 25), # 크리스마스
    ]

    if today.date() in [h.date() for h in holidays]:
        return False

    return True

def conditional_job():
    """조건부 작업"""
    if should_run_today():
        print(f"{datetime.now()}: 작업 실행")
        # 실제 작업 수행
    else:
        print(f"{datetime.now()}: 오늘은 작업 건너뜀")

sched = BackgroundScheduler()
sched.start()
sched.add_job(conditional_job, 'cron', hour=9, minute=0)

패턴 3: 동적 스케줄 변경

from apscheduler.schedulers.background import BackgroundScheduler
import json

def dynamic_job():
    print("동적 스케줄 작업 실행")

def load_schedule_config():
    """외부 설정 파일에서 스케줄 로드"""
    try:
        with open('schedule_config.json', 'r') as f:
            return json.load(f)
    except:
        return {'hour': 9, 'minute': 0}

def update_schedule(sched, job_id):
    """스케줄 동적 업데이트"""
    config = load_schedule_config()

    sched.reschedule_job(
        job_id,
        trigger='cron',
        hour=config['hour'],
        minute=config['minute']
    )
    print(f"스케줄 업데이트: {config['hour']}시 {config['minute']}분")

sched = BackgroundScheduler()
sched.start()

# 초기 스케줄 등록
sched.add_job(dynamic_job, 'cron', hour=9, minute=0, id='dynamic_job')

# 설정 파일이 변경되면 스케줄 업데이트 (예시)
# update_schedule(sched, 'dynamic_job')

성능 최적화

병렬 처리 설정

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

# Executor 설정
executors = {
    'default': ThreadPoolExecutor(max_workers=5),  # 스레드 풀
    'processpool': ProcessPoolExecutor(max_workers=3)  # 프로세스 풀
}

job_defaults = {
    'coalesce': False,  # 밀린 작업을 하나로 합치지 않음
    'max_instances': 3   # 동시 실행 최대 인스턴스 수
}

sched = BackgroundScheduler(
    executors=executors,
    job_defaults=job_defaults
)

def cpu_intensive_job():
    """CPU 집약적 작업"""
    import time
    result = sum(i**2 for i in range(10000000))
    print(f"계산 완료: {result}")

def io_intensive_job():
    """I/O 집약적 작업"""
    import requests
    response = requests.get('<https://api.example.com/data>')
    print(f"데이터 수신: {len(response.text)} bytes")

sched.start()

# CPU 작업은 프로세스 풀에서 실행
sched.add_job(
    cpu_intensive_job,
    'interval',
    seconds=60,
    executor='processpool'
)

# I/O 작업은 스레드 풀에서 실행 (기본값)
sched.add_job(
    io_intensive_job,
    'interval',
    seconds=30
)

메모리 최적화

from apscheduler.schedulers.background import BackgroundScheduler
import gc

def memory_intensive_job():
    """메모리를 많이 사용하는 작업"""
    # 대량의 데이터 처리
    large_data = [i for i in range(1000000)]
    result = sum(large_data)

    # 명시적 메모리 해제
    del large_data
    gc.collect()

    print(f"작업 완료: {result}")

sched = BackgroundScheduler()
sched.start()
sched.add_job(memory_intensive_job, 'interval', hours=1)

선택 가이드: schedule vs APScheduler

schedule을 선택해야 하는 경우

간단한 스케줄링: 몇 개의 작업만 실행 ✅ 학습 목적: Python 스케줄링 입문 ✅ 빠른 프로토타이핑: 테스트나 POC ✅ 가독성 우선: 코드를 읽기 쉽게 유지 ✅ 의존성 최소화: 가벼운 라이브러리 선호

예시 사용 사례

  • 개인 프로젝트 자동화
  • 간단한 알림 봇
  • 데이터 수집 스크립트

APScheduler를 선택해야 하는 경우

복잡한 스케줄: Cron 표현식 필요 ✅ 동시 실행: 여러 작업 병렬 처리 ✅ 동적 관리: 실행 중 작업 추가/제거 ✅ 영구 저장: 재시작 후 작업 유지 ✅ 엔터프라이즈: 대규모 배치 시스템

예시 사용 사례

  • 기업 배치 시스템
  • 웹 애플리케이션 백그라운드 작업
  • 복잡한 ETL 파이프라인
  • 분산 작업 스케줄링

함께 사용하면 좋은 도구

Docker와 함께 사용하기

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY scheduler.py .

CMD ["python", "scheduler.py"]

# docker-compose.yml
version: '3.8'

services:
  scheduler:
    build: .
    restart: always
    volumes:
      - ./logs:/app/logs
      - ./data:/app/data
    environment:
      - TZ=Asia/Seoul

systemd 서비스로 등록 (Linux)

# /etc/systemd/system/my-scheduler.service
[Unit]
Description=Python Scheduler Service
After=network.target

[Service]
Type=simple
User=myuser
WorkingDirectory=/home/myuser/scheduler
ExecStart=/usr/bin/python3 /home/myuser/scheduler/main.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

# 서비스 시작
sudo systemctl start my-scheduler
sudo systemctl enable my-scheduler
sudo systemctl status my-scheduler

로깅 통합

import logging
from logging.handlers import RotatingFileHandler
from apscheduler.schedulers.background import BackgroundScheduler

# 로깅 설정
logger = logging.getLogger('scheduler')
logger.setLevel(logging.INFO)

# 파일 핸들러 (10MB씩 최대 5개 파일)
file_handler = RotatingFileHandler(
    'scheduler.log',
    maxBytes=10*1024*1024,
    backupCount=5,
    encoding='utf-8'
)

formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

def logged_job():
    """로깅이 적용된 작업"""
    try:
        logger.info("작업 시작")
        # 실제 작업
        result = perform_task()
        logger.info(f"작업 완료: {result}")
    except Exception as e:
        logger.error(f"작업 실패: {str(e)}", exc_info=True)
        raise

def perform_task():
    # 실제 작업 내용
    return "성공"

sched = BackgroundScheduler()
sched.start()
sched.add_job(logged_job, 'interval', minutes=10)

디버깅 팁

schedule 디버깅

import schedule

def debug_job():
    print("작업 실행")

schedule.every(10).seconds.do(debug_job)

# 현재 등록된 모든 작업 확인
print("등록된 작업:", schedule.get_jobs())

# 다음 실행 시간 확인
for job in schedule.get_jobs():
    print(f"작업: {job.job_func.__name__}")
    print(f"다음 실행: {job.next_run}")
    print(f"실행 간격: {job.interval} {job.unit}")

APScheduler 디버깅

from apscheduler.schedulers.background import BackgroundScheduler
import logging

# APScheduler 로깅 활성화
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

sched = BackgroundScheduler()
sched.start()

def debug_job():
    print("디버그 작업")

sched.add_job(debug_job, 'interval', seconds=5, id='debug_job')

# 작업 정보 출력
job = sched.get_job('debug_job')
print(f"작업 ID: {job.id}")
print(f"다음 실행 시간: {job.next_run_time}")
print(f"트리거: {job.trigger}")

# 모든 작업 리스트
for job in sched.get_jobs():
    print(f"- {job.id}: {job.next_run_time}")

자주 발생하는 문제와 해결법

문제 1: 작업이 실행되지 않음

원인: while 루프가 너무 빨리 실행됨

# ❌ 잘못된 예
import schedule
schedule.every(10).seconds.do(job)
# while 루프 없음 - 작업이 실행 안 됨

# ✅ 올바른 예
import schedule
import time
schedule.every(10).seconds.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)  # 1초마다 체크

문제 2: 시간대(Timezone) 문제

from apscheduler.schedulers.background import BackgroundScheduler
from pytz import timezone

# ❌ 잘못된 예 - 서버 시간대 사용
sched = BackgroundScheduler()

# ✅ 올바른 예 - 명시적 시간대 지정
sched = BackgroundScheduler(timezone='Asia/Seoul')

# 또는 pytz 사용
korea_tz = timezone('Asia/Seoul')
sched = BackgroundScheduler(timezone=korea_tz)

문제 3: 중복 실행

from apscheduler.schedulers.background import BackgroundScheduler

sched = BackgroundScheduler()
sched.start()

# ❌ 잘못된 예 - 같은 ID로 여러 번 추가
sched.add_job(job, 'interval', seconds=10, id='my_job')
sched.add_job(job, 'interval', seconds=10, id='my_job')  # 오류 발생!

# ✅ 올바른 예 - replace_existing 사용
sched.add_job(
    job,
    'interval',
    seconds=10,
    id='my_job',
    replace_existing=True  # 기존 작업 교체
)

문제 4: 메모리 누수

# ❌ 잘못된 예
def leaky_job():
    global large_data
    large_data = [i for i in range(10000000)]
    # 메모리 해제 안 함

# ✅ 올바른 예
def clean_job():
    large_data = [i for i in range(10000000)]
    result = process(large_data)

    # 명시적 메모리 정리
    del large_data
    import gc
    gc.collect()

    return result

마무리하며

Python 스케줄링은 자동화의 핵심 기술입니다. 간단한 작업에는 schedule을, 복잡한 시스템에는 APScheduler를 선택하세요.

핵심 요약

  • schedule: 직관적, 간단, 빠른 시작
  • APScheduler: 강력함, 유연함, 엔터프라이즈급

실무 적용 체크리스트

  • ✅ 적절한 라이브러리 선택
  • ✅ 에러 처리 및 로깅 구현
  • ✅ 시간대 명시적 설정
  • ✅ 재시도 로직 구현
  • ✅ 모니터링 시스템 연동
  • ✅ 작업 실행 이력 관리

스케줄링은 한 번 설정하면 끝이 아닙니다. 주기적으로 로그를 확인하고, 작업이 제대로 실행되는지 모니터링하며, 필요에 따라 스케줄을 조정하세요.


참고 자료

댓글 남기기