« 上一篇

一个多进程管理类​

PHP实现多线程比较麻烦,做多进程相对简单,在多任务脚本时很有用,可以有效提升脚本执行效率

Process.class 多线程管理类

class Process {
    
    private $processNum = 2;
    private $timeout = 3000;//程序预计执行时间,超过此设置时间没有执行完,将不再等待进程回收
    private $execArr = array();
    
    public function execute()
    {
        //$log = Log::instance();
        //判断pcntl扩展是否开启
        if ( ! function_exists('pcntl_fork'))
        {
            //$log->add('ERR', 'pcntl扩展未开启!');
            die('pcntl扩展未开启!');
        }
        
        //创建管道
        $sPipePath = 'my_pipe'.posix_getpid();
        if ( ! posix_mkfifo($sPipePath, 0666))
        {
            die('创建管道'.$sPipePath.'错误');
        }
        //Core_Worm::fileLog('pcntl start!', 'w+');
        for ($i = 0; $i < $this->getpNum(); $i ++)
        {
            $nPID = pcntl_fork();//创建一个子进程
            if ($nPID == 0)
            {
                echo $i."\n";
                //子进程过程
                if (empty($this->execArr))
                {
                    die('进程执行数组为空,');
                }
                else
                {
                    call_user_func($this->execArr[$i]);
                }
                
                $oW = fopen($sPipePath, 'w');
                fwrite($oW, $i."\n"); // 当前任务处理完比,在管道中写入数据
                fclose($oW);
                exit(0);
            }
            sleep(3);//每启动一个进程休眠3秒,避免一次进程启动太多造成redis或者mysql读写出错
        }
        //父进程
        $fpid = fopen($sPipePath, 'r');
        stream_set_blocking($fpid, FALSE);//将管道设置为非堵塞,用于适应超时机制
        $sData  = ''; //存放管道中的数据
        $nLine  = 0;
        $nStart = time();
        while ($nLine < $this->getpNum() && (time() - $nStart) < $this->getTimeout())
        {
            $sLine = fread($fpid, 1024);
            if (empty($sLine))
            {
                continue;
            }
            echo 'current line:'.$sLine;
            //用于分析多少任务处理完毕,通过'\n'标识
            foreach ($arr = str_split($sLine) as $c)
            {
                //echo Debug::vars($arr);
                if ("\n" == $c)
                {
                    ++ $nLine;
                }
            }
            $sData .= $sLine;
        }
        
        echo "Final line count:$nLine\n";
        fclose($fpid);
        unlink($sPipePath);
        
        //等待子进程执行完毕,避免僵尸进程
        $n = 0;
        while ($n < $this->getpNum())
        {
            $nStatus = - 1;
            $nPID    = pcntl_wait($nStatus, WNOHANG);
            if ($nPID > 0)
            {
                echo "{$nPID} exit \n";
                ++ $n;
            }
        }
        
        //验证结果,主要看结果中是否每个任务都完成了
        $arr2 = array();
        foreach (explode("\n", $sData) as $i)
        {
            if (is_numeric(trim($i)))
            {
                array_push($arr2, $i);
            }
        }
        $arr2 = array_unique($arr2);
        if (count($arr2) == $this->getpNum())
        {
            echo "ok\n";
        }
        else
        {
            echo "error count ".count($arr2)."\n";
            var_dump($arr2);
        }
    }
    
    public function setExecArr($array)
    {
        $this->execArr = $array;
    }
    
    public function setTimeout($secends)
    {
        $this->timeout = $secends;
    }
    
    public function getTimeout()
    {
        return $this->timeout;
    }
    
    public function setpNum($num)
    {
        $this->processNum = $num;
    }
    
    public function getpNum()
    {
        return $this->processNum;
    }
}

调用执行,test.php

//实例化进程管理类
$process = new Process();
//创建五个子进程
$process->setpNum(5);
$process->setTimeout(1200);
//为每个进程分配执行程序
$process->setExecArr([
    function (){
       //执行程序1
    }
    , function (){
        //执行程序2
    },
    function (){
        //执行程序3
    },
    function (){
        //执行程序4
    },
    function (){
        //执行程序5
    }
]);
//调用执行
$process->execute();