php laravel v5.1 消息队列

* install https://laravel.com/docs/5.1#installation

composer create-project laravel/laravel msgq "5.1.*"


* 配置好redis
  参照这里 https://www.cnblogs.com/mingzhanghui/p/9338385.html

* 修改.env 指定redis作为队列的驱动

QUEUE_DRIVER=redis

  

* 改队列的配置文件 config/queue.php

'default' => env('QUEUE_DRIVER', 'redis'),  // 默认是sync 改为redis

  下面的这块保持默认

        'redis' => [
            'driver'     => 'redis',
            'connection' => 'default',
            'queue'      => 'default',
            'expire'     => 60,
        ],

  清除缓存

php artisan cache:clear
php artisan config:clear

 * 创建Controller

php artisan make:controller DefaultController

  

<?php

namespace AppHttpControllers;

use AppCommandsSendEmail;
use IlluminateHttpRequest;
use IlluminateQueueCapsuleManager as Queue;

use AppHttpRequests;
use IlluminateSupportFacadesRedis;

class DefaultController extends Controller
{

    public function __construct() {}

    /**
     * Display a listing of the resource.
     *
     * @return IlluminateHttpResponse
     */
    public function index() {
        $c = Redis::getFacadeApplication();
        $queue = new Queue($c);

        $queue->addConnection([
            'driver' => 'redis',
            'host' => '127.0.0.1',
            'queue' => 'default',
        ]);

        $queue->setAsGlobal();
        for ($i = 0; $i < 100; $i++) {
            $msg = 'sss'.$i;
            Queue::push(new SendEmail($msg));
        }
    }

    /**
     * Show the form for creating a new resource.
     *
     * @return IlluminateHttpResponse
     */
    public function create()
    {
        //
    }

    /**
     * Store a newly created resource in storage.
     *
     * @param  IlluminateHttpRequest  $request
     * @return IlluminateHttpResponse
     */
    public function store(Request $request)
    {
        //
    }

    /**
     * Display the specified resource.
     *
     * @param  int  $id
     * @return IlluminateHttpResponse
     */
    public function show($id)
    {
        //
    }

    /**
     * Show the form for editing the specified resource.
     *
     * @param  int  $id
     * @return IlluminateHttpResponse
     */
    public function edit($id)
    {
        //
    }

    /**
     * Update the specified resource in storage.
     *
     * @param  IlluminateHttpRequest  $request
     * @param  int  $id
     * @return IlluminateHttpResponse
     */
    public function update(Request $request, $id)
    {
        //
    }

    /**
     * Remove the specified resource from storage.
     *
     * @param  int  $id
     * @return IlluminateHttpResponse
     */
    public function destroy($id)
    {
        //
    }
}

  

* 生成命令

php artisan make:command SendEmail --queued

<?php

namespace AppCommands;
use IlluminateConsoleCommand;

use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsBusSelfHandling;
use IlluminateContractsQueueShouldQueue;

class SendEmail extends Command implements SelfHandling, ShouldQueue
{
    use InteractsWithQueue, SerializesModels;

    protected $msg;

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct($msg)
    {
        $this->msg = $msg;
    }

    /**
     * Execute the command.
     *
     * @return void
     */
    public function handle()
    {
        sleep(4);
        echo $this->msg.'	'.date('Y-m-d H:i:s').PHP_EOL;
        $this->delete();
    }
}

  

* 修改routes  ./app/Http/routes.php

Route::get('/', [
    'as' => 'index',
    'uses' => 'DefaultController@index'
]);

  

* 监听queue

php artisan queue:listen

  

* 启动服务

php artisan serve --port 8080

  

打开浏览器,访问http://localhost:8080/页面。当然也可以用nginx,apache之类的。但是需要各种配置,还是内建的使用方便。

在控制台就能看到各个queue执行的情况了,如下图。可以看到100个任务被三个work平分了

 

* 清空数据库

drop database if exists laravel;
create database laravel charset utf8 collate utf8_general_ci;

* 消息生成

php artisan make:job QueuedTest --queued

  => ./app/Jobs/QueuedTest.php

<?php

namespace AppJobs;

use IlluminateBusQueueable;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsQueueShouldQueue;
use IlluminateFoundationBusDispatchable;

class QueuedTest implements ShouldQueue {
    
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct() {
        //
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle() {
        echo "Queue test success";
    }
}
View Code

* 创建数据库消息队列的数据表迁移文件

php artisan queue:table

  => database/migrations/2018_07_21_033228_create_jobs_table.php

<?php

use IlluminateSupportFacadesSchema;
use IlluminateDatabaseSchemaBlueprint;
use IlluminateDatabaseMigrationsMigration;

class CreateJobsTable extends Migration
{
    /**
     * Run the migrations.
     *
     * @return void
     */
    public function up()
    {
        Schema::create('jobs', function (Blueprint $table) {
            $table->bigIncrements('id');
            $table->string('queue')->index();
            $table->longText('payload');
            $table->unsignedTinyInteger('attempts');
            $table->unsignedInteger('reserved_at')->nullable();
            $table->unsignedInteger('available_at');
            $table->unsignedInteger('created_at');
        });
    }

    /**
     * Reverse the migrations.
     *
     * @return void
     */
    public function down()
    {
        Schema::dropIfExists('jobs');
    }
}

迁移文件
View Code
php artisan migrate

  => 数据表结构 jobs表

-- MySQL dump 10.16  Distrib 10.1.31-MariaDB, for osx10.6 (i386)
--
-- Host: localhost    Database: laravel
-- ------------------------------------------------------
-- Server version    10.1.31-MariaDB

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;

--
-- Table structure for table `jobs`
--

DROP TABLE IF EXISTS `jobs`;
/*!40101 SET @saved_cs_client     = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) NOT NULL,
  `payload` longtext NOT NULL,
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  PRIMARY KEY (`id`),
  KEY `jobs_queue_index` (`queue`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
/*!40101 SET character_set_client = @saved_cs_client */;

--
-- Dumping data for table `jobs`
--

LOCK TABLES `jobs` WRITE;
/*!40000 ALTER TABLE `jobs` DISABLE KEYS */;
/*!40000 ALTER TABLE `jobs` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

-- Dump completed on 2018-07-21 11:39:29

jobs.sql
View Code

 假设数据库名为 laravel, 导出这个表

mysqldump -uroot -hlocalhost -p --databases laravel --tables jobs > jobs.sql

  创建controller

php artisan make:controller WelcomeController

  

php artisan serve --port 8080

 app/Commands/SendEmail.php

<?php

namespace AppCommands;

use AppCommandsCommand;
use IlluminateQueueSerializesModels;
use IlluminateQueueInteractsWithQueue;
use IlluminateContractsBusSelfHandling;
use IlluminateContractsQueueShouldQueue;

class SendEmail extends Command implements SelfHandling, ShouldQueue
{
    use InteractsWithQueue, SerializesModels;

    protected $msg;

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct($msg)
    {
        $this->msg = $msg;
    }

    /**
     * Execute the command.
     *
     * @return void
     */
    public function handle()
    {
        sleep(4);
        echo $this->msg.'	'.date('Y-m-d H:i:s').PHP_EOL;
        $this->delete();
    }
}

  

https://blog.csdn.net/chen529834149/article/details/76918406

http://laravelacademy.org/post/2012.html

原文地址:https://www.cnblogs.com/mingzhanghui/p/9346144.html