A Coding Guide to Build a Production-Grade Background Task Processing System Using Huey with SQLite, Scheduling, Retries, Pipelines, and Concurrency Control


consumer = huey.create_consumer(
   workers=4,
   worker_type=WORKER_THREAD,
   periodic=True,
   initial_delay=0.1,
   backoff=1.15,
   max_delay=2.0,
   scheduler_interval=1,
   check_worker_health=True,
   health_check_interval=10,
   flush_locks=False,
)


consumer_thread = threading.Thread(target=consumer.run, daemon=True)
consumer_thread.start()
print("Consumer started (threaded).")


print("\nEnqueue basics...")
r1 = quick_add(10, 32)
r2 = slow_io(0.75)
print("quick_add result:", r1(blocking=True, timeout=5))
print("slow_io result:", r2(blocking=True, timeout=5))


print("\nRetries + priority demo (flaky task)...")
rf = flaky_network_call(p_fail=0.7)
try:
   print("flaky_network_call result:", rf(blocking=True, timeout=10))
except Exception as e:
   print("flaky_network_call failed even after retries:", repr(e))


print("\nContext task (task id inside payload)...")
rp = cpu_pi_estimate(samples=150_000)
print("pi payload:", rp(blocking=True, timeout=20))


print("\nLocks demo: enqueue multiple locked jobs quickly (should serialize)...")
locked_results = [locked_sync_job(tag=f"run{i}") for i in range(3)]
print([res(blocking=True, timeout=10) for res in locked_results])


print("\nScheduling demo: run slow_io in ~3 seconds...")
rs = slow_io.schedule(args=(0.25,), delay=3)
print("scheduled handle:", rs)
print("scheduled slow_io result:", rs(blocking=True, timeout=10))


print("\nRevoke demo: schedule a task in 5s then revoke before it runs...")
rv = slow_io.schedule(args=(0.1,), delay=5)
rv.revoke()
time.sleep(6)
try:
   out = rv(blocking=False)
   print("revoked task output:", out)
except Exception as e:
   print("revoked task did not produce result (expected):", type(e).__name__, str(e)[:120])


print("\nPipeline demo...")
pipeline = (
   fetch_number.s(123)
   .then(transform_number, 5)
   .then(store_result)
)
pipe_res = huey.enqueue(pipeline)
print("pipeline final result:", pipe_res(blocking=True, timeout=10))


print("\nStarting 15-second heartbeat demo for ~40 seconds...")
start_seconds_heartbeat(interval_sec=15)
time.sleep(40)
stop_seconds_heartbeat()
print("Stopped 15-second heartbeat demo.")


print_latest_events(12)


print("\nStopping consumer gracefully...")
consumer.stop(graceful=True)
consumer_thread.join(timeout=5)
print("Consumer stopped.")



Source link

  • Related Posts

    Top 19 AI Red Teaming Tools (2026): Secure Your ML Models

    What Is AI Red Teaming? AI Red Teaming is the process of systematically testing artificial intelligence systems—especially generative AI and machine learning models—against adversarial attacks and security stress scenarios. Red…

    Qwen Team Open-Sources Qwen3.6-35B-A3B: A Sparse MoE Vision-Language Model with 3B Active Parameters and Agentic Coding Capabilities

    The open-source AI landscape has a new entry worth paying attention to. The Qwen team at Alibaba has released Qwen3.6-35B-A3B, the first open-weight model from the Qwen3.6 generation, and it…

    Leave a Reply

    Your email address will not be published. Required fields are marked *