“Python으로 자동화 작업을 일정에 맞춰 실행하고 싶은데, 어떤 라이브러리를 선택해야 할까요?”
매일 특정 시간에 리포트를 생성하거나, 주기적으로 데이터를 수집하거나, 정해진 시간에 알림을 보내는 등의 작업을 자동화하려면 스케줄링이 필수입니다. 이 글에서는 Python의 대표적인 스케줄링 라이브러리인 schedule과 APScheduler를 깊이 있게 비교하고, 실전 예제로 활용법을 알려드립니다.
스케줄링이 필요한 순간들
실무에서 자주 쓰이는 사례
- 데이터 수집: 매일 새벽 3시에 웹 크롤링 실행
- 리포트 생성: 매주 월요일 오전 9시에 주간 보고서 자동 생성
- 알림 발송: 매일 출근 시간에 텔레그램/슬랙 메시지 전송
- 백업 작업: 매일 자정에 데이터베이스 백업
- 모니터링: 5분마다 서버 상태 체크
Schedule vs APScheduler 한눈에 비교
| 특징 | schedule | APScheduler |
|---|---|---|
| 난이도 | 초보자 친화적 | 중급 이상 권장 |
| 코드 가독성 | 매우 직관적 | 설정 기반 |
| 기능 범위 | 기본 스케줄링 | 고급 기능 다수 |
| 동시 실행 | 어려움 | 쉬움 (멀티스레드) |
| 작업 관리 | 제한적 | 동적 추가/제거 |
| Cron 표현식 | 미지원 | 지원 |
| 적합한 프로젝트 | 간단한 자동화 | 복잡한 배치 시스템 |
Schedule: 직관적이고 간단한 스케줄링
설치 방법
pip install schedule
기본 사용법
schedule의 가장 큰 장점은 영어 문장처럼 읽히는 직관적인 문법입니다.
import schedule
import time
def job():
print("작업 실행 중...")
print(f"실행 시간: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 10초마다 실행
schedule.every(10).seconds.do(job)
# 10분마다 실행
schedule.every(10).minutes.do(job)
# 매 시간 실행
schedule.every().hour.do(job)
# 매일 오전 10시 30분에 실행
schedule.every().day.at("10:30").do(job)
# 매주 월요일 실행
schedule.every().monday.do(job)
# 매주 수요일 오후 1시 15분에 실행
schedule.every().wednesday.at("13:15").do(job)
# 스케줄 실행
while True:
schedule.run_pending()
time.sleep(1)
파라미터 전달하기
함수에 인자를 전달할 수 있습니다.
def send_notification(name, message):
print(f"{name}님께 메시지 전송: {message}")
# 파라미터와 함께 스케줄 등록
schedule.every().day.at("09:00").do(
send_notification,
name="홍길동",
message="좋은 아침입니다!"
)
while True:
schedule.run_pending()
time.sleep(1)
작업 취소하기
등록된 스케줄을 취소하는 방법입니다.
import schedule
def morning_job():
print("아침 작업 실행")
return schedule.CancelJob # 이 작업을 한 번만 실행
# 매일 실행되지만 첫 실행 후 자동 취소
schedule.every().day.at("06:00").do(morning_job)
# 또는 명시적으로 취소
job = schedule.every().hour.do(print, "매 시간 실행")
schedule.cancel_job(job) # 작업 취소
특정 시간까지만 실행하기
from datetime import datetime, timedelta
def limited_job():
print("제한된 시간 동안만 실행되는 작업")
# 오늘 자정까지만 실행
schedule.every(10).minutes.until("23:59").do(limited_job)
# 특정 날짜까지만 실행
end_date = datetime.now() + timedelta(days=7)
schedule.every().day.until(end_date).do(limited_job)
while True:
schedule.run_pending()
time.sleep(1)
태그를 활용한 작업 그룹 관리
import schedule
def morning_task():
print("아침 작업")
def evening_task():
print("저녁 작업")
def urgent_task():
print("긴급 작업")
# 태그 설정
schedule.every().day.at("08:00").do(morning_task).tag('morning', 'daily')
schedule.every().day.at("18:00").do(evening_task).tag('evening', 'daily')
schedule.every(5).minutes.do(urgent_task).tag('urgent')
# 특정 태그의 작업만 취소
schedule.clear('morning') # 아침 작업만 취소
schedule.clear('daily') # 일일 작업 모두 취소
while True:
schedule.run_pending()
time.sleep(1)
실전 예제: 매일 리포트 생성
import schedule
import time
from datetime import datetime
import pandas as pd
def generate_daily_report():
"""일일 리포트 생성"""
print(f"[{datetime.now()}] 리포트 생성 시작")
# 데이터 수집 (예시)
data = {
'날짜': [datetime.now().strftime('%Y-%m-%d')],
'방문자': [1234],
'매출': [567890]
}
df = pd.DataFrame(data)
filename = f"report_{datetime.now().strftime('%Y%m%d')}.csv"
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"리포트 저장 완료: {filename}")
# 매일 오전 9시에 리포트 생성
schedule.every().day.at("09:00").do(generate_daily_report)
# 주말에는 실행하지 않기
def is_weekday():
return datetime.now().weekday() < 5
def weekday_report():
if is_weekday():
generate_daily_report()
schedule.every().day.at("09:00").do(weekday_report)
print("스케줄러 시작...")
while True:
schedule.run_pending()
time.sleep(60) # 1분마다 체크
APScheduler: 강력하고 유연한 고급 스케줄링
설치 방법
pip install apscheduler
스케줄러 종류 이해하기
APScheduler는 용도에 따라 다른 스케줄러를 제공합니다.
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
# BlockingScheduler: 단일 작업, 메인 스레드 차단
# 순수 스케줄링 전용 스크립트에 사용
blocking_sched = BlockingScheduler()
# BackgroundScheduler: 다중 작업, 백그라운드 실행
# 웹 애플리케이션이나 다른 작업과 병행 시 사용
background_sched = BackgroundScheduler()
3가지 트리거 방식
APScheduler는 세 가지 스케줄링 방식을 지원합니다.
1. Interval 트리거 (일정 간격 반복)
from apscheduler.schedulers.background import BackgroundScheduler
import time
def interval_job():
print(f"[{time.strftime('%H:%M:%S')}] Interval 작업 실행")
sched = BackgroundScheduler()
sched.start()
# 3초마다 실행
sched.add_job(interval_job, 'interval', seconds=3, id='interval_job')
# 10분마다 실행
sched.add_job(interval_job, 'interval', minutes=10, id='minute_job')
# 1시간마다 실행
sched.add_job(interval_job, 'interval', hours=1, id='hour_job')
# 메인 프로그램 계속 실행
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
sched.shutdown()
2. Cron 트리거 (특정 시간 실행)
Cron 표현식으로 정교한 스케줄링이 가능합니다.
from apscheduler.schedulers.background import BackgroundScheduler
import time
def cron_job():
print(f"[{time.strftime('%H:%M:%S')}] Cron 작업 실행")
sched = BackgroundScheduler()
sched.start()
# 매 5초마다 실행 (*/5는 5의 배수 초에 실행)
sched.add_job(cron_job, 'cron', second='*/5', id='every_5_sec')
# 매일 오전 9시에 실행
sched.add_job(cron_job, 'cron', hour=9, minute=0, id='daily_9am')
# 평일 오전 9시에만 실행
sched.add_job(
cron_job,
'cron',
day_of_week='mon-fri', # 월~금
hour=9,
minute=0,
id='weekday_morning'
)
# 매월 1일 오전 10시에 실행
sched.add_job(
cron_job,
'cron',
day=1, # 1일
hour=10,
minute=0,
id='monthly_report'
)
# 매시간 정각과 30분에 실행
sched.add_job(
cron_job,
'cron',
minute='0,30', # 0분과 30분
id='twice_hourly'
)
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
sched.shutdown()
3. Date 트리거 (일회성 실행)
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
def one_time_job():
print("일회성 작업 실행")
sched = BackgroundScheduler()
sched.start()
# 특정 시간에 한 번만 실행
run_date = datetime.now() + timedelta(seconds=10)
sched.add_job(
one_time_job,
'date',
run_date=run_date,
id='one_time'
)
# 2025년 12월 31일 23시 59분에 실행
sched.add_job(
one_time_job,
'date',
run_date=datetime(2025, 12, 31, 23, 59, 0),
id='new_year_countdown'
)
파라미터 전달하기
from apscheduler.schedulers.background import BackgroundScheduler
def job_with_params(name, age, city="서울"):
print(f"이름: {name}, 나이: {age}, 도시: {city}")
sched = BackgroundScheduler()
sched.start()
# args로 위치 인자 전달 (리스트나 튜플)
sched.add_job(
job_with_params,
'interval',
seconds=5,
args=["홍길동", 30], # 반드시 리스트/튜플
id='param_job_1'
)
# kwargs로 키워드 인자 전달 (딕셔너리)
sched.add_job(
job_with_params,
'interval',
seconds=5,
kwargs={"name": "김철수", "age": 25, "city": "부산"},
id='param_job_2'
)
# args와 kwargs 혼합
sched.add_job(
job_with_params,
'interval',
seconds=5,
args=["이영희", 28],
kwargs={"city": "대구"},
id='param_job_3'
)
작업 동적 관리하기
APScheduler의 강력한 기능 중 하나는 실행 중에 작업을 추가/제거/수정할 수 있다는 점입니다.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.base import JobLookupError
import time
def job():
print(f"[{time.strftime('%H:%M:%S')}] 작업 실행")
sched = BackgroundScheduler()
sched.start()
# 작업 추가
sched.add_job(job, 'interval', seconds=3, id='removable_job')
count = 0
while True:
print("메인 프로세스 실행 중...")
time.sleep(1)
count += 1
# 10초 후 작업 제거
if count == 10:
try:
sched.remove_job('removable_job')
print("작업 제거 완료!")
except JobLookupError:
print("작업을 찾을 수 없습니다")
break
sched.shutdown()
작업 일시 중지 및 재개
from apscheduler.schedulers.background import BackgroundScheduler
def pausable_job():
print("일시 중지 가능한 작업")
sched = BackgroundScheduler()
sched.start()
sched.add_job(pausable_job, 'interval', seconds=2, id='pause_test')
# 5초 후 일시 중지
time.sleep(5)
sched.pause_job('pause_test')
print("작업 일시 중지")
# 5초 후 재개
time.sleep(5)
sched.resume_job('pause_test')
print("작업 재개")
작업 수정하기
from apscheduler.schedulers.background import BackgroundScheduler
def modifiable_job():
print("수정 가능한 작업")
sched = BackgroundScheduler()
sched.start()
# 처음에는 5초마다
sched.add_job(modifiable_job, 'interval', seconds=5, id='mod_job')
# 10초 후 간격을 2초로 변경
time.sleep(10)
sched.reschedule_job('mod_job', trigger='interval', seconds=2)
print("작업 간격 변경: 5초 → 2초")
실전 예제 1: 텔레그램 봇 자동 알림
from apscheduler.schedulers.background import BackgroundScheduler
import telegram
from datetime import datetime
# 텔레그램 봇 설정
TOKEN = 'your_bot_token'
CHAT_ID = 'your_chat_id'
bot = telegram.Bot(token=TOKEN)
def send_morning_message():
"""아침 인사 메시지"""
message = f"좋은 아침입니다! 😊\\n오늘은 {datetime.now().strftime('%Y년 %m월 %d일')}입니다."
bot.send_message(chat_id=CHAT_ID, text=message)
print(f"[{datetime.now()}] 아침 메시지 전송 완료")
def send_lunch_reminder():
"""점심 알림 메시지"""
message = "점심 시간입니다! 🍽️"
bot.send_message(chat_id=CHAT_ID, text=message)
print(f"[{datetime.now()}] 점심 알림 전송 완료")
def send_evening_message():
"""저녁 인사 메시지"""
message = "수고하셨습니다! 좋은 저녁 되세요 🌙"
bot.send_message(chat_id=CHAT_ID, text=message)
print(f"[{datetime.now()}] 저녁 메시지 전송 완료")
# 스케줄러 설정
sched = BackgroundScheduler(timezone='Asia/Seoul')
sched.start()
# 평일 오전 8시 - 아침 인사
sched.add_job(
send_morning_message,
'cron',
day_of_week='mon-fri',
hour=8,
minute=0,
id='morning_msg'
)
# 평일 낮 12시 - 점심 알림
sched.add_job(
send_lunch_reminder,
'cron',
day_of_week='mon-fri',
hour=12,
minute=0,
id='lunch_reminder'
)
# 평일 오후 6시 - 퇴근 인사
sched.add_job(
send_evening_message,
'cron',
day_of_week='mon-fri',
hour=18,
minute=0,
id='evening_msg'
)
print("텔레그램 봇 스케줄러 시작...")
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
sched.shutdown()
print("스케줄러 종료")
실전 예제 2: 웹 크롤링 자동화
from apscheduler.schedulers.blocking import BlockingScheduler
import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
def crawl_news():
"""뉴스 크롤링 작업"""
print(f"[{datetime.now()}] 크롤링 시작...")
try:
# 예시: 간단한 크롤링
url = "<https://news.example.com>"
response = requests.get(url, timeout=10)
soup = BeautifulSoup(response.text, 'html.parser')
# 데이터 추출 (예시)
titles = [title.text for title in soup.select('.news-title')[:10]]
# 데이터프레임 생성 및 저장
df = pd.DataFrame({
'수집시간': [datetime.now()] * len(titles),
'제목': titles
})
filename = f"news_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
df.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"크롤링 완료: {len(titles)}개 기사 수집")
print(f"파일 저장: {filename}")
except Exception as e:
print(f"크롤링 오류: {str(e)}")
# BlockingScheduler 사용 (단일 작업 전용)
sched = BlockingScheduler(timezone='Asia/Seoul')
# 매일 오전 6시, 정오, 오후 6시에 크롤링
sched.add_job(crawl_news, 'cron', hour='6,12,18', minute=0, id='news_crawler')
# 또는 4시간마다 크롤링
# sched.add_job(crawl_news, 'interval', hours=4, id='news_crawler')
print("뉴스 크롤러 시작...")
try:
sched.start()
except (KeyboardInterrupt, SystemExit):
pass
실전 예제 3: 데이터베이스 백업
from apscheduler.schedulers.background import BackgroundScheduler
import sqlite3
import shutil
from datetime import datetime
import os
def backup_database():
"""데이터베이스 백업"""
print(f"[{datetime.now()}] 백업 시작...")
try:
# 백업 폴더 생성
backup_dir = 'backups'
if not os.path.exists(backup_dir):
os.makedirs(backup_dir)
# 원본 DB 파일
source_db = 'mydata.db'
# 백업 파일명 (타임스탬프 포함)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{backup_dir}/backup_{timestamp}.db"
# 파일 복사
shutil.copy2(source_db, backup_file)
# 백업 파일 크기 확인
size_mb = os.path.getsize(backup_file) / (1024 * 1024)
print(f"백업 완료: {backup_file}")
print(f"파일 크기: {size_mb:.2f} MB")
# 오래된 백업 파일 삭제 (30일 이상)
cleanup_old_backups(backup_dir, days=30)
except Exception as e:
print(f"백업 오류: {str(e)}")
def cleanup_old_backups(backup_dir, days=30):
"""오래된 백업 파일 삭제"""
from datetime import timedelta
cutoff_time = datetime.now() - timedelta(days=days)
for filename in os.listdir(backup_dir):
filepath = os.path.join(backup_dir, filename)
file_time = datetime.fromtimestamp(os.path.getmtime(filepath))
if file_time < cutoff_time:
os.remove(filepath)
print(f"오래된 백업 삭제: {filename}")
# 스케줄러 설정
sched = BackgroundScheduler(timezone='Asia/Seoul')
sched.start()
# 매일 새벽 3시에 백업
sched.add_job(
backup_database,
'cron',
hour=3,
minute=0,
id='daily_backup'
)
# 또는 6시간마다 백업
# sched.add_job(backup_database, 'interval', hours=6, id='periodic_backup')
print("데이터베이스 백업 스케줄러 시작...")
고급 기능 활용하기
작업 실행 이력 및 모니터링
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def monitored_job():
"""모니터링되는 작업"""
print("작업 실행 중...")
# 일부러 오류 발생시키기 (테스트용)
# raise Exception("테스트 오류")
def job_listener(event):
"""작업 이벤트 리스너"""
if event.exception:
logger.error(f"작업 실패: {event.job_id} - {event.exception}")
else:
logger.info(f"작업 성공: {event.job_id}")
# 스케줄러 설정
sched = BackgroundScheduler()
# 이벤트 리스너 등록
sched.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
sched.start()
sched.add_job(monitored_job, 'interval', seconds=10, id='monitored')
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
sched.shutdown()
Job Store 활용 (영구 저장)
기본적으로 APScheduler는 메모리에 작업을 저장하지만, 데이터베이스에 저장하여 재시작 후에도 유지할 수 있습니다.
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# Job Store 설정 (SQLite 사용)
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
sched = BackgroundScheduler(jobstores=jobstores)
sched.start()
def persistent_job():
print("영구 저장된 작업 실행")
# replace_existing=True로 중복 방지
sched.add_job(
persistent_job,
'interval',
minutes=10,
id='persistent_job',
replace_existing=True # 기존 작업이 있으면 교체
)
# 프로그램을 재시작해도 작업이 유지됩니다
여러 스케줄러 동시 사용
from apscheduler.schedulers.background import BackgroundScheduler
def priority_job():
print("높은 우선순위 작업")
def normal_job():
print("일반 작업")
# 우선순위별 스케줄러 분리
priority_sched = BackgroundScheduler()
normal_sched = BackgroundScheduler()
priority_sched.start()
normal_sched.start()
# 각각 다른 작업 등록
priority_sched.add_job(priority_job, 'interval', seconds=5)
normal_sched.add_job(normal_job, 'interval', seconds=10)
에러 처리 및 안정성
재시도 로직 구현
from apscheduler.schedulers.background import BackgroundScheduler
from functools import wraps
import time
def retry(max_attempts=3, delay=2):
"""재시도 데코레이터"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
print(f"모든 시도 실패: {str(e)}")
raise
print(f"시도 {attempt + 1} 실패. {delay}초 후 재시도...")
time.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, delay=2)
def unstable_job():
"""불안정한 작업 (예: 네트워크 요청)"""
import random
if random.random() < 0.5: # 50% 확률로 실패
raise Exception("작업 실패")
print("작업 성공!")
sched = BackgroundScheduler()
sched.start()
sched.add_job(unstable_job, 'interval', seconds=10)
타임아웃 설정
from apscheduler.schedulers.background import BackgroundScheduler
import signal
from contextlib import contextmanager
class TimeoutException(Exception):
pass
@contextmanager
def time_limit(seconds):
"""타임아웃 컨텍스트 매니저"""
def signal_handler(signum, frame):
raise TimeoutException("작업 시간 초과")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
def long_running_job():
"""오래 걸리는 작업"""
try:
with time_limit(30): # 30초 제한
# 실제 작업 수행
import time
time.sleep(10)
print("작업 완료")
except TimeoutException:
print("작업이 30초를 초과하여 강제 종료되었습니다")
sched = BackgroundScheduler()
sched.start()
sched.add_job(long_running_job, 'interval', minutes=5)
실무 활용 패턴
패턴 1: 작업 체이닝 (순차 실행)
from apscheduler.schedulers.background import BackgroundScheduler
import time
def step1():
print("1단계: 데이터 수집")
time.sleep(2)
return True
def step2():
print("2단계: 데이터 처리")
time.sleep(2)
return True
def step3():
print("3단계: 결과 저장")
time.sleep(1)
return True
def chained_job():
"""체인 작업 실행"""
print("=== 작업 시작 ===")
if step1():
if step2():
step3()
print("=== 작업 완료 ===")
sched = BackgroundScheduler()
sched.start()
sched.add_job(chained_job, 'cron', hour=3, minute=0)
패턴 2: 조건부 실행
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
def should_run_today():
"""오늘 작업을 실행해야 하는지 확인"""
# 예: 공휴일이 아닌 평일에만 실행
today = datetime.now()
# 주말 체크
if today.weekday() >= 5:
return False
# 공휴일 리스트 (예시)
holidays = [
datetime(2025, 1, 1), # 신정
datetime(2025, 3, 1), # 삼일절
datetime(2025, 5, 5), # 어린이날
datetime(2025, 12, 25), # 크리스마스
]
if today.date() in [h.date() for h in holidays]:
return False
return True
def conditional_job():
"""조건부 작업"""
if should_run_today():
print(f"{datetime.now()}: 작업 실행")
# 실제 작업 수행
else:
print(f"{datetime.now()}: 오늘은 작업 건너뜀")
sched = BackgroundScheduler()
sched.start()
sched.add_job(conditional_job, 'cron', hour=9, minute=0)
패턴 3: 동적 스케줄 변경
from apscheduler.schedulers.background import BackgroundScheduler
import json
def dynamic_job():
print("동적 스케줄 작업 실행")
def load_schedule_config():
"""외부 설정 파일에서 스케줄 로드"""
try:
with open('schedule_config.json', 'r') as f:
return json.load(f)
except:
return {'hour': 9, 'minute': 0}
def update_schedule(sched, job_id):
"""스케줄 동적 업데이트"""
config = load_schedule_config()
sched.reschedule_job(
job_id,
trigger='cron',
hour=config['hour'],
minute=config['minute']
)
print(f"스케줄 업데이트: {config['hour']}시 {config['minute']}분")
sched = BackgroundScheduler()
sched.start()
# 초기 스케줄 등록
sched.add_job(dynamic_job, 'cron', hour=9, minute=0, id='dynamic_job')
# 설정 파일이 변경되면 스케줄 업데이트 (예시)
# update_schedule(sched, 'dynamic_job')
성능 최적화
병렬 처리 설정
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# Executor 설정
executors = {
'default': ThreadPoolExecutor(max_workers=5), # 스레드 풀
'processpool': ProcessPoolExecutor(max_workers=3) # 프로세스 풀
}
job_defaults = {
'coalesce': False, # 밀린 작업을 하나로 합치지 않음
'max_instances': 3 # 동시 실행 최대 인스턴스 수
}
sched = BackgroundScheduler(
executors=executors,
job_defaults=job_defaults
)
def cpu_intensive_job():
"""CPU 집약적 작업"""
import time
result = sum(i**2 for i in range(10000000))
print(f"계산 완료: {result}")
def io_intensive_job():
"""I/O 집약적 작업"""
import requests
response = requests.get('<https://api.example.com/data>')
print(f"데이터 수신: {len(response.text)} bytes")
sched.start()
# CPU 작업은 프로세스 풀에서 실행
sched.add_job(
cpu_intensive_job,
'interval',
seconds=60,
executor='processpool'
)
# I/O 작업은 스레드 풀에서 실행 (기본값)
sched.add_job(
io_intensive_job,
'interval',
seconds=30
)
메모리 최적화
from apscheduler.schedulers.background import BackgroundScheduler
import gc
def memory_intensive_job():
"""메모리를 많이 사용하는 작업"""
# 대량의 데이터 처리
large_data = [i for i in range(1000000)]
result = sum(large_data)
# 명시적 메모리 해제
del large_data
gc.collect()
print(f"작업 완료: {result}")
sched = BackgroundScheduler()
sched.start()
sched.add_job(memory_intensive_job, 'interval', hours=1)
선택 가이드: schedule vs APScheduler
schedule을 선택해야 하는 경우
✅ 간단한 스케줄링: 몇 개의 작업만 실행 ✅ 학습 목적: Python 스케줄링 입문 ✅ 빠른 프로토타이핑: 테스트나 POC ✅ 가독성 우선: 코드를 읽기 쉽게 유지 ✅ 의존성 최소화: 가벼운 라이브러리 선호
예시 사용 사례
- 개인 프로젝트 자동화
- 간단한 알림 봇
- 데이터 수집 스크립트
APScheduler를 선택해야 하는 경우
✅ 복잡한 스케줄: Cron 표현식 필요 ✅ 동시 실행: 여러 작업 병렬 처리 ✅ 동적 관리: 실행 중 작업 추가/제거 ✅ 영구 저장: 재시작 후 작업 유지 ✅ 엔터프라이즈: 대규모 배치 시스템
예시 사용 사례
- 기업 배치 시스템
- 웹 애플리케이션 백그라운드 작업
- 복잡한 ETL 파이프라인
- 분산 작업 스케줄링
함께 사용하면 좋은 도구
Docker와 함께 사용하기
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY scheduler.py .
CMD ["python", "scheduler.py"]
# docker-compose.yml
version: '3.8'
services:
scheduler:
build: .
restart: always
volumes:
- ./logs:/app/logs
- ./data:/app/data
environment:
- TZ=Asia/Seoul
systemd 서비스로 등록 (Linux)
# /etc/systemd/system/my-scheduler.service
[Unit]
Description=Python Scheduler Service
After=network.target
[Service]
Type=simple
User=myuser
WorkingDirectory=/home/myuser/scheduler
ExecStart=/usr/bin/python3 /home/myuser/scheduler/main.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
# 서비스 시작
sudo systemctl start my-scheduler
sudo systemctl enable my-scheduler
sudo systemctl status my-scheduler
로깅 통합
import logging
from logging.handlers import RotatingFileHandler
from apscheduler.schedulers.background import BackgroundScheduler
# 로깅 설정
logger = logging.getLogger('scheduler')
logger.setLevel(logging.INFO)
# 파일 핸들러 (10MB씩 최대 5개 파일)
file_handler = RotatingFileHandler(
'scheduler.log',
maxBytes=10*1024*1024,
backupCount=5,
encoding='utf-8'
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def logged_job():
"""로깅이 적용된 작업"""
try:
logger.info("작업 시작")
# 실제 작업
result = perform_task()
logger.info(f"작업 완료: {result}")
except Exception as e:
logger.error(f"작업 실패: {str(e)}", exc_info=True)
raise
def perform_task():
# 실제 작업 내용
return "성공"
sched = BackgroundScheduler()
sched.start()
sched.add_job(logged_job, 'interval', minutes=10)
디버깅 팁
schedule 디버깅
import schedule
def debug_job():
print("작업 실행")
schedule.every(10).seconds.do(debug_job)
# 현재 등록된 모든 작업 확인
print("등록된 작업:", schedule.get_jobs())
# 다음 실행 시간 확인
for job in schedule.get_jobs():
print(f"작업: {job.job_func.__name__}")
print(f"다음 실행: {job.next_run}")
print(f"실행 간격: {job.interval} {job.unit}")
APScheduler 디버깅
from apscheduler.schedulers.background import BackgroundScheduler
import logging
# APScheduler 로깅 활성화
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
sched = BackgroundScheduler()
sched.start()
def debug_job():
print("디버그 작업")
sched.add_job(debug_job, 'interval', seconds=5, id='debug_job')
# 작업 정보 출력
job = sched.get_job('debug_job')
print(f"작업 ID: {job.id}")
print(f"다음 실행 시간: {job.next_run_time}")
print(f"트리거: {job.trigger}")
# 모든 작업 리스트
for job in sched.get_jobs():
print(f"- {job.id}: {job.next_run_time}")
자주 발생하는 문제와 해결법
문제 1: 작업이 실행되지 않음
원인: while 루프가 너무 빨리 실행됨
# ❌ 잘못된 예
import schedule
schedule.every(10).seconds.do(job)
# while 루프 없음 - 작업이 실행 안 됨
# ✅ 올바른 예
import schedule
import time
schedule.every(10).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1) # 1초마다 체크
문제 2: 시간대(Timezone) 문제
from apscheduler.schedulers.background import BackgroundScheduler
from pytz import timezone
# ❌ 잘못된 예 - 서버 시간대 사용
sched = BackgroundScheduler()
# ✅ 올바른 예 - 명시적 시간대 지정
sched = BackgroundScheduler(timezone='Asia/Seoul')
# 또는 pytz 사용
korea_tz = timezone('Asia/Seoul')
sched = BackgroundScheduler(timezone=korea_tz)
문제 3: 중복 실행
from apscheduler.schedulers.background import BackgroundScheduler
sched = BackgroundScheduler()
sched.start()
# ❌ 잘못된 예 - 같은 ID로 여러 번 추가
sched.add_job(job, 'interval', seconds=10, id='my_job')
sched.add_job(job, 'interval', seconds=10, id='my_job') # 오류 발생!
# ✅ 올바른 예 - replace_existing 사용
sched.add_job(
job,
'interval',
seconds=10,
id='my_job',
replace_existing=True # 기존 작업 교체
)
문제 4: 메모리 누수
# ❌ 잘못된 예
def leaky_job():
global large_data
large_data = [i for i in range(10000000)]
# 메모리 해제 안 함
# ✅ 올바른 예
def clean_job():
large_data = [i for i in range(10000000)]
result = process(large_data)
# 명시적 메모리 정리
del large_data
import gc
gc.collect()
return result
마무리하며
Python 스케줄링은 자동화의 핵심 기술입니다. 간단한 작업에는 schedule을, 복잡한 시스템에는 APScheduler를 선택하세요.
핵심 요약
- schedule: 직관적, 간단, 빠른 시작
- APScheduler: 강력함, 유연함, 엔터프라이즈급
실무 적용 체크리스트
- ✅ 적절한 라이브러리 선택
- ✅ 에러 처리 및 로깅 구현
- ✅ 시간대 명시적 설정
- ✅ 재시도 로직 구현
- ✅ 모니터링 시스템 연동
- ✅ 작업 실행 이력 관리
스케줄링은 한 번 설정하면 끝이 아닙니다. 주기적으로 로그를 확인하고, 작업이 제대로 실행되는지 모니터링하며, 필요에 따라 스케줄을 조정하세요.
참고 자료
- schedule 공식 문서: https://schedule.readthedocs.io/
- APScheduler 공식 문서: https://apscheduler.readthedocs.io/
- Python 공식 문서 – datetime: https://docs.python.org/ko/3/library/datetime.html
- Cron 표현식 생성기: https://crontab.guru/