Laravelプロジェクト"間"でキューを使ってメッセージをやり取りする時のポイント

こんにちは!開発本部 SINIS for Instagram 開発チームのtenshin_yです。 今回は、Laravelプロジェクトと別のLaravelプロジェクトでキューを使って非同期処理を実装したため、連携する時のポイントをまとめようと思います。 なお、今回の実装例ではキューにAmazon SQSを使っています。

要点

Laravelプロジェクト間でキューを共有するために

  • Jobクラスの名前空間ディレクトリ構造を両プロジェクトで統一する
  • 同じSQS接続設定を各プロジェクトで共有する

キューの設定と実装

構成

複数のLaravelプロジェクトが共通のキュー(Amazon SQS、データベース、Redis等)を参照することで、メッセージを共有していきます。 以下のサンプルは全てAmazon SQSを利用していきます。

キュー接続の設定

各プロジェクトで同じSQS接続設定を行います。

config/queue.php

<?php
// 省略
'connections' => [
    'shared_sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
        'queue' => env('SQS_QUEUE', 'default'),
        'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
        'after_commit' => false,
    ],
],

.env ファイル

QUEUE_CONNECTION=sqs

AWS_ACCESS_KEY_ID=your_access_key_id
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_DEFAULT_REGION=us-east-1
SQS_PREFIX=https://sqs.us-east-1.amazonaws.com/123456789012
SQS_QUEUE=shared-queue

ジョブクラスの作成

共有キュー用のジョブを作成します。

ここでのポイントが以下になります。

  1. API側ではワーカーを立てないため、handle()メソッドの実装詳細は不要
  2. バッチ側は実装が必要
  3. キューに積むメッセージ内にはJobが定義されたファイルのパスが入るため、JobクラスのディレクトリをAPIとバッチで揃える必要がある

メッセージの例.

{
  "Messages": [
    {
      "MessageId": "94a9e27a-2706-4af1-a093-7f473ccb4dad",
      "ReceiptHandle": "94a9e27a-2706-4af1-a093-7f473ccb4dad#8047d91a-d49e-4f9b-adbc-679ffa8b44c2",
      "MD5OfBody": "8ae1c15cd88bcec79c2be71f2c594878",
      "Body": "{\"uuid\":\"81bf054f-ce67-4508-8c0c-48bada9fe47c\",\"displayName\":\"App\\\\Jobs\\\\SharedJob\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"maxExceptions\":null,\"failOnTimeout\":false,\"backoff\":null,\"timeout\":null,\"retryUntil\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\SharedJob\",\"command\":\"O:20:\\\"App\\\\Jobs\\\\SharedJob\\\":11:{s:24:\\\"\\u0000*\\u0000accountUuIds\\\";a:2:{i:0;s:17:\\\"17841466395182854\\\";i:1;s:17:\\\"17841464726021831\\\";}s:3:\\\"job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";s:13:\\\"neo-job-queue\\\";s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:19:\\\"chainCatchCallbacks\\\";N;s:5:\\\"delay\\\";N;s:11:\\\"afterCommit\\\";N;s:10:\\\"middleware\\\";a:0:{}s:7:\\\"chained\\\";a:0:{}}\"}}"
    }
  ]
}

API

API側ではJobを処理しないため、処理の実装がこちらでは必要がありません。

<?php

// 両方のLaravelプロジェクトで同じ名前空間とクラス構造にする

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class SharedJob implements ShouldQueue
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;


    protected $accountUuIds;

    /**
     * @param array<int, string> $accountUuids
     */
    public function __construct(array $accountUuIds)
    {
        $this->accountUuIds = $accountUuIds;
    }


    public function handle(): void
    {
        // API側では処理は実行されないためinterfaceだけ揃える
    }
}

バッチ

こちらは実際に処理をする側のため、handle()メソッドに処理の内容が書かれます。

<?php

// 両方のLaravelプロジェクトで同じ名前空間とクラス構造にする

namespace App\Jobs;

use Log;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Infrastructure\EloquentModel\Account;

class SharedJob implements ShouldQueue
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;
    use SerializesModels;


    protected $accountUuIds;

    /**
     * @param array<int, string> $accountUuids
     */
    public function __construct(array $accountUuIds)
    {
        $this->accountUuIds = $accountUuIds;
    }
    
    public function handle()
    {
        Log::info('SharedJob', [
            'accountUuIds' => $this->accountUuIds
        ]);

        $accounts = Account::query()
            ->whereIn('uuid', $this->accountUuIds)
            ->get();

        Log::info('SharedJob', [
            'accounts' => $accounts
        ]);
    }
}

特定の接続でのジョブディスパッチ

あとは、enqueueする側で設定したqueueを指してJobをdispatchするだけです。 ここでは特別なことは必要ないです。

use App\Jobs\SharedJob;

SharedJob::dispatch($uuids)
    ->onConnection('shared_sqs');

まとめ

PHPJavaScriptPromise.all()のような言語レベルでの並列非同期処理を持ちませんが、キューを活用することで、プロジェクト間での非同期処理が実現できます。 便利ですね。

要点おさらい:

  • Jobクラスの名前空間ディレクトリ構造を両プロジェクトで統一する
  • 同じSQS接続設定を各プロジェクトで共有する

テテマーチでは、一緒にサービスを作り上げてくれる仲間を募集中です。ご興味のある方は、以下のリンクからぜひご応募ください! herp.careers

エンジニアチームガイドはこちら! tetemarche01.notion.site