- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
4 l ^( O% I* s5 E# u - require_once __DIR__ . '/Workerman/Autoloader.php';% U9 H: W2 }% } m4 B- {
& l4 A+ V6 d. r6 D- M- $worker = new Worker();
9 t! O) a- k7 Y+ S% q - // 4个进程6 G6 T8 \/ g: W& q4 D, g
- $worker->count = 4;
! \9 Q# _$ k4 k) S - // 每个进程启动后在当前进程新增一个Worker监听( W. o2 n$ m# U( [/ B' u
- $worker->onWorkerStart = function($worker); Z8 B: J, j7 ~; B3 h# b: P5 b! L
- {
" v' k; k6 \7 C' i - /**
: r4 U$ a8 B. a. l; o! f$ h4 } - * 4个进程启动的时候都创建2016端口的Worker' L0 d9 O/ y" E# f; A
- * 当执行到worker->listen()时会报Address already in use错误
6 T* T. X1 d0 x+ H% v7 x$ r/ C - * 如果worker->count=1则不会报错7 u8 b' a0 J. y- B( i
- */
$ y2 d3 u& d7 Q9 U \5 R, P% r1 ` - $inner_worker = new Worker('http://0.0.0.0:2016');
6 k3 Y! {6 i+ [- P - $inner_worker->onMessage = 'on_message';
% _. @) c) R' g) T5 v3 M - // 执行监听。这里会报Address already in use错误5 m0 Y6 }7 V l6 {
- $inner_worker->listen();( x/ K: e' b; e5 b
- };, ]" B: n" N6 n1 i. U; f2 k
* C) B5 E+ q: D0 F o5 z- $worker->onMessage = 'on_message';# d, V" x- |- U: r. X
- $ [, ?7 c& h' s8 F( F
- function on_message($connection, $data)( y: {7 b/ Q* m' @6 _6 ~' s
- {
$ ?1 Z o _5 o2 ^6 z' f* @ - $connection->send("hello\n");
5 j. ~ I# ?, z5 J3 m+ |9 h - }
3 D( e: k3 J: Q/ | - 0 F0 {8 `0 S/ g9 B* t7 Z
- // 运行worker
, @$ @- S. ]. V; [# y9 \ - Worker::runAll();5 O% R/ e1 X2 G; \$ W3 u& S
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( T5 y. E0 I. y
- + [- n, S% f. T8 A8 E" Y
- use Workerman\Worker;
/ N9 [3 ^5 [/ x2 {! Y( B - require_once './Workerman/Autoloader.php';5 B0 i* b: Y+ [$ u1 s! u( `7 {" k; E1 ^
1 j7 p$ m! [* |1 |* V0 N- $worker = new Worker('text://0.0.0.0:2015');' E9 Q l3 R$ A0 Z/ E0 @+ @ E4 w2 m
- // 4个进程
& [- `: m. m5 R - $worker->count = 4;& K$ U: O: Q5 @! u% `
- // 每个进程启动后在当前进程新增一个Worker监听
2 O1 O( Z/ x4 y8 x6 ?3 j7 N: | - $worker->onWorkerStart = function($worker)7 g) F/ e: F# O5 C, n% U
- {
7 [- g) @6 i# M - $inner_worker = new Worker('http://0.0.0.0:2016');
4 E+ W/ j, f6 i. r/ H4 O - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
+ c. `- q) b9 w& i K. s1 l5 i - $inner_worker->reusePort = true;
. y3 |: }% N6 q" u1 D4 a - $inner_worker->onMessage = 'on_message';
# ]; z; S8 Z. H$ b5 Q( ^ O/ v3 C - // 执行监听。正常监听不会报错
% \6 u7 D* B& c1 y4 |/ b( x - $inner_worker->listen();
; X/ H. x, H& | - };
1 t/ v9 l2 p. J0 J: I( E, p
1 y. C, ^. |8 e- $worker->onMessage = 'on_message';
- d1 b$ J D5 T' I* h$ Q
, X4 l5 |& k( D- function on_message($connection, $data), h$ V, Z6 s$ l/ g, U
- {
& a* n6 z% p! u% }$ R. f8 x - $connection->send("hello\n");0 R9 z g9 M# d6 o
- }
3 ?. t' \: f! l( ^" V# R* @6 A3 J - ! R( `7 r! k. _1 x* L d8 W B( R
- // 运行worker! d' O7 R: k; U$ }# ^8 M7 _
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
8 I5 C) K) f. \+ ] - use Workerman\Worker;
- z( a' R9 Z+ P: I - require_once './Workerman/Autoloader.php';
. T# [! j; ^- b" i" v. ^* d2 h - // 初始化一个worker容器,监听1234端口* D8 D; f6 [8 l0 p. E
- $worker = new Worker('websocket://0.0.0.0:1234');8 `" d5 Z. B7 T# m
- 6 V: V1 c4 o) w
- /*
' H! b5 l+ ~! o* J, y2 x - * 注意这里进程数必须设置为1,否则会报端口占用错误
4 {5 K8 \ |! E - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true). L3 g4 m1 l2 D2 ~& F+ o
- */# v8 p) |; q2 P v, }" t q- j
- $worker->count = 1;6 |4 Y+ y4 d: U7 n3 K! D
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口( `. M( N, v/ T: j* u
- $worker->onWorkerStart = function($worker)2 |# M! ?+ Z. v( Y: v- `) S: }+ [4 c
- {0 v. g! c3 j& `6 K) X: H
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
' o* r/ b$ w! o& E - $inner_text_worker = new Worker('text://0.0.0.0:5678');+ P2 M$ \) i3 K4 f+ Q
- $inner_text_worker->onMessage = function($connection, $buffer): Q- ?7 u/ D" n
- {
6 R( z1 |: \5 m+ d* A/ |5 k - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
1 a+ f: |6 d4 I( |, a8 k" q* b - $data = json_decode($buffer, true);, i$ U9 i i5 }5 Z9 e$ V
- $uid = $data['uid'];
' I! c! w7 }( Q; U, n - // 通过workerman,向uid的页面推送数据
5 j& u+ Q1 ] W/ G - $ret = sendMessageByUid($uid, $buffer);1 p6 F$ ]9 O5 ^
- // 返回推送结果6 S' c0 t' @. g
- $connection->send($ret ? 'ok' : 'fail');' E/ W3 R' Z" y: I# A
- };0 K) J8 f1 L# C2 Z) Q
- // ## 执行监听 ##" y( a4 y6 n) m$ T3 }3 q; y
- $inner_text_worker->listen();
( D; j5 o9 o$ r9 [1 \. g - };, U5 z( E; S* v3 X% G6 _2 q1 Q4 u
- // 新增加一个属性,用来保存uid到connection的映射" r% |4 z- n$ j% @* ]2 I. l2 c
- $worker->uidConnections = array();! f& M6 @1 D2 w% B7 G4 p
- // 当有客户端发来消息时执行的回调函数
; Q: ~: I% m) _$ Q# V( F - $worker->onMessage = function($connection, $data)6 g0 q2 {/ W% V3 M$ ?
- {" k2 E' [9 B1 N
- global $worker;
! e% `$ u: b' y- g - // 判断当前客户端是否已经验证,既是否设置了uid
9 B1 H9 T5 G. l4 E3 D3 A - if(!isset($connection->uid))
+ f: e" o7 H5 w R' E! b$ g2 d1 _ - {
8 R6 A) a9 S0 G# y$ W - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
/ J! y7 B+ F' y6 S - $connection->uid = $data;
* ~5 K4 P( _6 G6 @$ P - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,4 P# K# ^; r1 j6 t& i( v8 B& r
- * 实现针对特定uid推送数据
2 S1 ]' }! ?. \/ w+ {# X( } - */7 l8 z4 j4 S) J9 u3 z
- $worker->uidConnections[$connection->uid] = $connection;+ T' u7 ?* J9 W
- return;
# T/ ^2 A$ H( e! w1 ? H# R( f - }
. y) P* \2 }: y - };: {' P0 x/ I& d3 p" h4 m
- 6 w# E; q. j2 x( ^0 Z' Z6 K" x
- // 当有客户端连接断开时, k( r8 Q1 ~7 K1 s& f& `+ |; e
- $worker->onClose = function($connection)8 z; ?1 \3 _8 Y3 g
- {
! |: f2 V5 \$ d6 v+ g; n' e; y - global $worker; G; C! ]$ v* o O6 g
- if(isset($connection->uid))
$ I& H* n8 V& g1 m6 P - {- ^* b& h) ~! s; I3 M4 Z+ r
- // 连接断开时删除映射
) Q6 [8 x4 |' o, ^. i - unset($worker->uidConnections[$connection->uid]);" j3 Y$ Z6 F; W- }8 @5 k) Q
- }
2 q7 n) N0 R( j - };6 w" `. h9 [) ` ]3 X" V' ^
- 9 V7 w! N q) U1 P
- // 向所有验证的用户推送数据) z/ l& d- r5 r. W9 t
- function broadcast($message)& _$ H2 V0 P9 P K; H, A
- {8 \% U6 e4 a6 y' E' B& E7 K
- global $worker;
, T3 C# E$ A' C - foreach($worker->uidConnections as $connection)" \' t: M2 j3 n) N6 O# y. u
- {# ?7 N8 {8 W! W
- $connection->send($message);
7 M6 G' ?' `6 b3 E. p& D+ t& E - }0 Y# U$ i5 n/ q7 Y6 p( S' G8 k
- }
9 [9 M0 [5 |# X w& I8 W
2 R0 d) A9 @5 Y4 O" h+ ]- a8 s+ {: z- // 针对uid推送数据
8 F0 a# \+ y7 y, W! I' B5 j - function sendMessageByUid($uid, $message): L0 i! J, T+ V6 `
- {& D3 L' g: G/ ]% x4 N1 Y9 N3 a. P
- global $worker; x. V' C! Z9 m! f- x
- if(isset($worker->uidConnections[$uid])), G# M6 [/ w; r8 Y
- {/ N4 F' e& ], V" \8 ]7 o3 y" v/ u
- $connection = $worker->uidConnections[$uid];
, k9 C, @! \9 f! L$ n% H# h4 A) O - $connection->send($message);
0 q+ P9 G1 `/ r) G0 H1 r8 n. i - return true;
2 X4 _* j5 r+ U/ R8 X - }: J V! e4 e' F" s
- return false;
3 c* B9 ?5 |# `& a/ w" t9 i+ u - }
0 w7 t: H" w2 j3 f9 c" ^0 w - + W M6 j" L# d1 I
- // 运行所有的worker
! B) T7 r2 Z2 ^! C$ ^- q, j - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
, s6 ]% F0 s1 C# A- G - ws.onopen = function(){
2 o, X- g- Q3 T, r' ? - var uid = 'uid1'; P( P# n) ^, \! V
- ws.send(uid);
# c6 d" o o! h/ v; A8 y - };
. c) `: T1 g! o% B+ |/ w - ws.onmessage = function(e){
% R$ ?' J9 Y( [3 ~/ l) p2 w - alert(e.data);
8 I: v# |3 z O3 m4 e1 W/ { - };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口) X* n E/ n( D; C6 w1 x
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
, |0 x6 D! k( F/ T( T* T# [- s& W' w7 O. w8 Q - // 推送的数据,包含uid字段,表示是给这个uid推送
) `' `7 T6 g# ?$ a - $data = array('uid'=>'uid1', 'percent'=>'88%');( D* @2 V C0 ~' ]* `$ x3 n
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符% t. E2 p) X" ^
- fwrite($client, json_encode($data)."\n");1 O/ j5 B# E( @1 i" q
- // 读取推送结果& _' E* U0 L( x" d' v
- echo fread($client, 8192);
复制代码
K9 a5 h2 U) Y+ y, g- Q
8 e5 s _( D; B# V8 v; B. p |