fucongcong/framework

View on GitHub
core/Group/Queue/Bear.php

Summary

Maintainability
A
1 hr
Test Coverage
<?php
//queue核心
namespace Group\Queue;
 
use swoole_process;
use Group\Queue\TubeTick;
use Pheanstalk\Pheanstalk;
use Group\Cache\BootstrapClass;
 
class Bear
{
protected $logDir;
 
protected $classCache;
 
protected $workerNum;
 
protected $workerPids;
 
protected $workers;
 
protected $tubes;
 
protected $pheanstalk;
 
protected $linstener;
 
protected $server;
 
protected $timer;
 
public function __construct($loader)
Whitespace found at end of line
{
Whitespace found at end of line
$this->initParam($loader);
}
 
/**
* 启动队列服务
*
*/
public function start()
Spaces must be used for alignment; tabs are not allowed
Whitespace found at end of line
{
$this->checkStatus();
\Log::info("异步队列服务启动", [], 'queue.bear');
//将主进程设置为守护进程
swoole_process::daemon(true);
//设置信号
$this->setSignal();
 
//启动N个work工作进程
$this->startWorkers();
 
//启动队列监听器
$this->bindTubeTick();
 
Whitespace found at end of line
$this->setPid();
}
 
/**
* 重启队列服务
*
*/
public function restart()
{
$this->stop();
sleep(3);
$this->start();
}
 
/**
* 停止队列服务
*
*/
Function `stop` has a Cognitive Complexity of 11 (exceeds 5 allowed). Consider refactoring.
The variable $work_id is not named in camelCase.
public function stop()
{
Spaces must be used for alignment; tabs are not allowed
$pid = $this->getPid();
if (!empty($pid) && $pid) {
if (swoole_process::kill($pid, 0)) {
//杀掉worker进程
foreach (\FileCache::get('work_ids', $this->logDir."/") as $work_id) {
try {
swoole_process::kill($work_id, SIGTERM);
} catch (Exception $e) {
\Log::info("进程{$work_id}不存在", [], 'queue.stop');
}
}
Whitespace found at end of line
}
}
}
 
/**
* 获取主进程pid
*
* @return int
*/
public function getPid()
{
Spaces must be used for alignment; tabs are not allowed
Inline control structures are not allowed
if (file_exists($this->logDir."/pid"))
return file_get_contents($this->logDir."/pid");
}
 
/**
* 设置进程pid
*
*/
public function setPid()
{
$pid = posix_getpid();
$parts = explode('/', $this->logDir."/pid");
$file = array_pop($parts);
$dir = '';
foreach ($parts as $part) {
if (!is_dir($dir .= "$part/")) {
mkdir($dir);
}
}
file_put_contents("$dir/$file", $pid);
}
 
/**
* 设置信号监听
*
*/
The variable $worker_count is not named in camelCase.
private function setSignal()
Spaces must be used for alignment; tabs are not allowed
Whitespace found at end of line
{
//子进程结束时主进程收到的信号
Avoid unused parameters such as '$signo'.
swoole_process::signal(SIGCHLD, function ($signo) {
 
//kill掉所有worker进程 必须为false,非阻塞模式
static $worker_count = 0;
Expected 1 space after WHILE keyword; 0 found
while($ret = swoole_process::wait(false)) {
$worker_count++;
\Log::info("PID={$ret['pid']}worker进程退出!", [], 'queue.bear');
Expected 1 space after closing parenthesis; found 0
if ($worker_count >= $this->workerNum){
//删除pid文件
unlink($this->logDir."/work_ids");
unlink($this->logDir."/pid");
\Log::info("主进程退出!", [], 'queue.bear');
Whitespace found at end of line
swoole_process::kill($this->getPid(), SIGKILL);
}
Whitespace found at end of line
}
});
 
Spaces must be used for alignment; tabs are not allowed
//主进程重启时收到的信号,该信号用于用户自定义
Spaces must be used for alignment; tabs are not allowed
// swoole_process::signal(SIGUSR1, function ($signo) {
 
Spaces must be used for alignment; tabs are not allowed
// });
}
 
/**
* 启动worker进程处理队列任务
*
*/
private function startWorkers()
Whitespace found at end of line
{
//启动worker进程
for ($i = 0; $i < $this->workerNum; $i++) {
$process = new swoole_process(array($this, 'workerCallBack'), true);
$processPid = $process->start();
$this->setWorkerPids($processPid);
$this->workers[$processPid] = [
'process' => $process,
'tube' => $this->tubes[$i],
];
}
}
 
/**
* worker进程callback
*
* @param swoole_process worker
*/
Whitespace found at end of line
public function workerCallBack(swoole_process $worker)
Whitespace found at end of line
{
$this->init();
 
$server = $this->server;
$listener = $this->listener;
$timer = $this->timer;
//worker进程
Avoid unused parameters such as '$pipe'.
Expected 1 space after FUNCTION keyword; 0 found
swoole_event_add($worker->pipe, function($pipe) use ($worker, $server, $listener, $timer) {
 
$recv = $worker->read();
 
if (isset($server['host']) && isset($server['port'])) {
$this->handleJob($server, $recv, $listener, $timer);
The method workerCallBack uses an else expression. Else clauses are basically not necessary and you can simplify the code by not using them.
} else {
foreach ($server as $one) {
$this->handleJob($one, $recv, $listener, $timer);
}
Whitespace found at end of line
}
});
 
//接受退出的信号
Avoid unused parameters such as '$signo'.
swoole_process::signal(SIGTERM, function ($signo) use ($worker) {
$worker->exit();
});
}
 
private function handleJob($server, $recv, $listener, $timer)
{
$pheanstalk = new Pheanstalk($server['host'], $server['port'], 10);
Avoid unused parameters such as '$timerId'.
Expected 1 space before opening brace; found 0
Expected 1 space after FUNCTION keyword; 0 found
swoole_timer_tick(intval($timer), function($timerId) use ($recv, $listener, $pheanstalk){
$recv = $listener->getJob($recv, $pheanstalk);
if ($recv) {
Whitespace found at end of line
$recv = unserialize($recv);
if (is_object($recv['job'])) {
Expected 1 space after TRY keyword; 0 found
try{
Avoid unused local variables such as '$job'.
foreach ($recv['handle'] as $handerClass => $job) {
Line indented incorrectly; expected at least 28 spaces, found 27
$handler = new $handerClass($recv['job']->getId(), $recv['job']->getData());
Line indented incorrectly; expected at least 28 spaces, found 27
$handler->handle();
}
Line exceeds 120 characters; contains 132 characters
//删除任务 是否应该放到用户队列任务 让用户自行删除?包括可以操作release和bury
$pheanstalk->delete($recv['job']);
Line exceeds 120 characters; contains 128 characters
//\Log::info("jobId:".$recv['job']->getId()."任务完成".$recv['job']->getData(), [], 'queue.worker');
Expected 1 space after closing brace; 0 found
Expected 1 space after CATCH keyword; 0 found
Expected 1 space after closing parenthesis; found 0
}catch(\Exception $e){
Line exceeds 120 characters; contains 210 characters
\Log::error("jobId:".$recv['job']->getId()."任务出错了!", ['jobId' => $recv['job']->getId(), 'jobData' => $recv['job']->getData(), 'message' => $e->getMessage()], 'queue.worker');
}
}
}
});
}
 
/**
* 设置worker进程的pid
*
* @param pid int
*/
private function setWorkerPids($pid)
{
$this->workerPids[] = $pid;
\FileCache::set('work_ids', $this->workerPids, $this->logDir."/");
}
 
/**
* 绑定队列事件
*
*/
private function bindTubeTick()
{
$tick = new TubeTick($this->workers, $this->pheanstalk);
$tick->work();
}
 
/**
* 初始化参数
*
Whitespace found at end of line
* @param loader
*/
private function initParam($loader)
{
Spaces must be used for alignment; tabs are not allowed
Whitespace found at end of line
$this->logDir = \Config::get("queue::log_dir");
\Log::$cacheDir = $this->logDir;
Spaces must be used for alignment; tabs are not allowed
Whitespace found at end of line
$this->classCache = \Config::get("queue::class_cache");
$server = \Config::get("queue::server");
$this->server = $server;
 
if (isset($server['host']) && isset($server['port'])) {
$this->initPheanstalk($server['host'], $server['port']);
The method initParam uses an else expression. Else clauses are basically not necessary and you can simplify the code by not using them.
} else {
foreach ($server as $one) {
$this->initPheanstalk($one['host'], $one['port']);
}
}
 
//开始队列任务的监听
$this->listener = new TubeListener();
$this->workerNum = $this->setWorkNum($this->listener->getJobs());
$this->tubes = $this->listener->getTubes();
Whitespace found at end of line
$this->timer = \Config::get("queue::timer");
Whitespace found at end of line
$this->bootstrapClass($loader, $this->listener->getJobs());
}
 
private function initPheanstalk($host, $port)
{
$pheanstalk = new Pheanstalk($host, $port, 10, true);
Expected 1 space after IF keyword; 0 found
if(!$pheanstalk->getConnection()->isServiceListening()) {
echo("beanstalkd队列服务器连接失败,地址:{$host}:{$port}");
The method initPheanstalk uses an else expression. Else clauses are basically not necessary and you can simplify the code by not using them.
} else {
$this->pheanstalk[] = $pheanstalk;
}
}
 
/**
* 缓存类文件
*
Whitespace found at end of line
* @param loader
Whitespace found at end of line
* @param jobs
*/
private function bootstrapClass($loader, $jobs)
{
$classCache = new BootstrapClass($loader, $this->classCache);
foreach ($jobs as $job) {
Avoid unused local variables such as '$value'.
foreach ($job as $handerClass => $value) {
$classCache->setClass($handerClass);
Whitespace found at end of line
}
}
$classCache->bootstrap();
require $this->classCache;
}
 
/**
* 设置worker数量
*
Whitespace found at end of line
* @param jobs
*/
Function `setWorkNum` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.
private function setWorkNum($jobs)
{
$workerNum = 0;
foreach ($jobs as $job) {
$taskWorkerNum = 0;
Avoid unused local variables such as '$key'.
foreach ($job as $key => $value) {
Line indented incorrectly; expected 16 spaces, found 15
if ($taskWorkerNum < $value['task_worker_num']) {
$taskWorkerNum = $value['task_worker_num'];
Closing brace indented incorrectly; expected 15 spaces, found 16
}
}
$workerNum += $taskWorkerNum;
Blank line found at end of control structure
}
 
return $workerNum;
}
 
private function checkStatus()
{
if ($this->getPid()) {
if (swoole_process::kill($this->getPid(), 0)) {
The method checkStatus() contains an exit expression.
exit('队列服务已启动!');
}
}
}
 
private function init()
Whitespace found at end of line
{
Inline control structures are not allowed
Expected 1 space after IF keyword; 0 found
if(function_exists("opcache_reset")) opcache_reset();
$loader = require __ROOT__.'/vendor/autoload.php';
$loader->setUseIncludePath(true);
Missing class import via use statement (line '343', column '20').
$app = new \Group\App\App();
$app->initSelf();
$app->registerServices();
$app->singleton('container')->setAppPath(__ROOT__);
}
}