- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;' n1 S% s4 B" p" H+ [
- require_once __DIR__ . '/Workerman/Autoloader.php';
1 d/ O& h, j: n8 }* y U& L
6 `, G7 U+ l$ N9 w* I b4 |- $worker = new Worker();* I' w' m4 r4 ?
- // 4个进程
1 N" I @2 J0 H9 E0 ` - $worker->count = 4;! F& |& I7 L( L6 L% ]. y
- // 每个进程启动后在当前进程新增一个Worker监听
' L2 F( k' B# v/ D0 k! c' ~) c - $worker->onWorkerStart = function($worker)
2 B0 n$ e% s4 z6 F( Y/ ] - {+ X B P: l# _4 S
- /**
4 y% o( \% n/ @% _' J2 \' X0 M+ p( n - * 4个进程启动的时候都创建2016端口的Worker9 e4 F6 p6 c# u* c
- * 当执行到worker->listen()时会报Address already in use错误; T7 `- G. P! |7 \$ w, H
- * 如果worker->count=1则不会报错# V5 w) S: c3 @
- */! @7 g: h. R% `6 ?7 F( l5 ]" B
- $inner_worker = new Worker('http://0.0.0.0:2016');! N* G& [" D1 ~ [: R
- $inner_worker->onMessage = 'on_message';" S* J2 |" S0 p7 \9 O
- // 执行监听。这里会报Address already in use错误- V. s: ?- x6 z
- $inner_worker->listen();
( ?. e, p' v$ X; G2 J( K - };
4 _1 e3 F* t% z
* B) z0 L# j7 s$ e% ^6 ]) k- $worker->onMessage = 'on_message';: B- W" i, B4 n, x' Q5 j8 \2 x
- , \$ x) w, m# L% O
- function on_message($connection, $data)
: ~, L7 T# M& i, t$ z/ W - {
! _* U$ Q/ ^* R1 j ^ - $connection->send("hello\n");
' X# j! @# p+ _, j - }0 v* w% L3 |! s ^$ R
- - Q, i4 s: E& |
- // 运行worker- I1 b8 V" j! G5 N' L6 q
- Worker::runAll();
8 y% _: g' t# u% s9 _+ m w - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
' v }. `0 x4 t7 t
" j/ j. q$ x3 N. P- use Workerman\Worker;5 J0 f8 |* d" I0 l
- require_once './Workerman/Autoloader.php';
) D* W) n) o; G
& @* H( X) @9 E/ H- $worker = new Worker('text://0.0.0.0:2015');
( N# Q `5 c1 V# }6 s5 T - // 4个进程- t! E3 ?/ ?5 K1 ~' T' i
- $worker->count = 4;) L7 a& t$ v3 ]1 ~5 Q
- // 每个进程启动后在当前进程新增一个Worker监听
' h) N; u$ U: z - $worker->onWorkerStart = function($worker)
9 L* S5 a; N$ I7 R% |3 n - {
$ v7 p0 P6 f9 }; b - $inner_worker = new Worker('http://0.0.0.0:2016');1 P, B9 p- P$ d0 E. S4 G. C. C! i6 O
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)2 [2 m3 h8 y& ~+ F" C+ q! v
- $inner_worker->reusePort = true;9 x2 @& C4 ^) I0 S5 d' y5 a/ Z: Y' U
- $inner_worker->onMessage = 'on_message';
6 ]8 ~" F2 [" Z - // 执行监听。正常监听不会报错
5 U `5 O% A- O5 Z( Y0 w1 @ - $inner_worker->listen();
" M- @( G/ v4 [4 I2 j - };) S9 o; y: {+ I
- " k) l; K! V" e; c7 w
- $worker->onMessage = 'on_message';
% }: \* R. X q7 C) `; l
$ T4 ?, c; ~% f# ?, j5 t- function on_message($connection, $data), s, c$ B6 `# D
- {
1 K6 \7 ^) U7 ?7 @ - $connection->send("hello\n");
! `" I4 O8 p! s9 a - }, D9 Y% ?8 r" r* L, k1 W
6 V9 ~ v. T% S- E: y- // 运行worker
, e1 W U1 z0 R3 b - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
* q& N4 G! K$ k) S - use Workerman\Worker;
* z: o8 P$ t) C% K5 t4 w1 j - require_once './Workerman/Autoloader.php';
2 n* x* Z2 T/ K2 U( x+ k - // 初始化一个worker容器,监听1234端口* `) C+ J, e) i9 t( a
- $worker = new Worker('websocket://0.0.0.0:1234');' q. E! t V" E
% i; ?1 F$ T; C$ i" M- /*
% r, K6 z" G( Y( C - * 注意这里进程数必须设置为1,否则会报端口占用错误
3 z% ? F U" K - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
/ n6 C: ^6 c+ M* { - */3 [: |1 h, ]4 O( ` `4 G2 h
- $worker->count = 1;7 A0 C0 l" P5 n" o6 i8 f
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
! m; p! Q5 [ e. J - $worker->onWorkerStart = function($worker)
. E) f2 h# N: i1 Q) e1 D& B - {8 E: g& ~3 F5 ?4 U
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 z6 ^/ @% j e8 H- f9 }" U' D* O
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
, g m3 I6 e! U9 @. @9 ^ - $inner_text_worker->onMessage = function($connection, $buffer): O1 W! U4 ^9 J3 `7 K$ n
- {, \. l# ~" K* e+ b
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据8 O. O8 {1 j' q/ Z; G4 l' ` u
- $data = json_decode($buffer, true);
% r1 C Z' J+ F8 r- N - $uid = $data['uid'];: `8 P, C l* x
- // 通过workerman,向uid的页面推送数据
E3 u* r" ^; ~( e2 G - $ret = sendMessageByUid($uid, $buffer);
# U: [. ^# \: n- ], B - // 返回推送结果, [! g6 i7 E+ g- N6 y
- $connection->send($ret ? 'ok' : 'fail');0 H: q) U8 X; U G: ]
- };
2 V4 G1 H9 ~( b, {1 I - // ## 执行监听 ##0 s5 U, I$ H) w& X* Z0 ^
- $inner_text_worker->listen();
7 ^8 Y) L' Y" ^/ @; D; ?6 M - };" Z( f1 g# q$ W# [" ]1 ?
- // 新增加一个属性,用来保存uid到connection的映射0 D' ]" Q$ s: E# K
- $worker->uidConnections = array();. l/ `$ u u$ J, l2 x4 A, t
- // 当有客户端发来消息时执行的回调函数
: D5 O* R$ a: Q$ b& w1 O; `# K3 k$ Q - $worker->onMessage = function($connection, $data) k1 U" { R# D" X- v/ t% w
- {
3 T1 }, b- _. E) @8 |. D - global $worker;; S5 Q) x8 m7 |: }& y4 J1 y
- // 判断当前客户端是否已经验证,既是否设置了uid N: \% B7 \* A
- if(!isset($connection->uid))
* ?( y6 W+ G0 V5 ?' Z0 a+ p/ Q3 C - {
, }7 ^$ J( S- Y4 C4 K% \ - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)6 B, B$ k& z8 }) O7 m
- $connection->uid = $data;
! x2 p- L! Q' R- { - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
7 ?$ ~& A5 r5 `' Y3 Z! k! s - * 实现针对特定uid推送数据
5 e3 R5 H% m' _% E. @. b - */1 P7 Z" ?8 F4 |* D2 U
- $worker->uidConnections[$connection->uid] = $connection;
- Z+ w5 r5 m2 s& o+ U* I - return;; a- e% m9 S) \8 d' {+ w
- }4 ^2 J. ?. a% }5 x& c
- };* T$ D- a* r6 [8 p X
, B8 T6 [5 _5 C& H5 ]( G- // 当有客户端连接断开时
% b1 g9 t$ [6 v q. `) D7 ^4 M( R0 O7 j - $worker->onClose = function($connection)5 B+ ?) n9 ^) J T& P4 p* K6 x* p
- {8 H+ z- `1 n, ^) u6 F
- global $worker;
\2 r4 B; ~* e& Z - if(isset($connection->uid))) M j9 k3 w& v6 r
- {- [3 I2 ]9 Z$ I1 c
- // 连接断开时删除映射( E. F; v9 X+ n
- unset($worker->uidConnections[$connection->uid]);9 m- I6 W3 ?2 I* @- F9 p9 o
- }# s( I& f4 P |- i8 }9 w
- };$ l0 l' t; K* K
- + j, y. a: V" C) A/ T
- // 向所有验证的用户推送数据
, a( Q# i. Q( ~2 q2 `& F" B2 [+ t - function broadcast($message)
5 y( n2 o5 a& r - {# U% i P/ i4 R& y% v0 }
- global $worker;
$ y9 y& @1 V/ R1 N - foreach($worker->uidConnections as $connection)
9 P r2 x* R [ - {) V- _3 Z: Z: W v$ ^6 l2 ~# k: e- i8 E! h
- $connection->send($message);, X, S v8 d, z
- }
% y* W' T' x- _+ o - }! ^& c9 q; o2 K/ d% ?0 J
- / F( k: H2 K: d X2 w
- // 针对uid推送数据
+ ?: h! ^' u3 T3 ~9 D1 N2 t+ g - function sendMessageByUid($uid, $message)
; |7 H0 j5 U, C - {$ w& b" |9 p7 @% d3 P: ?! ?( p
- global $worker;+ h* ^( r4 ?# L+ V! w) u) H Q
- if(isset($worker->uidConnections[$uid]))
; i6 @% J. K6 q$ a+ A1 D" p - {
* \* R6 @$ T/ z& t - $connection = $worker->uidConnections[$uid];
) h$ v" W8 N( T; C - $connection->send($message);5 v' q: p( a' p
- return true;6 D! X9 H! H* k
- }; R3 F5 E( Z, P" w+ m
- return false;% `& ^- m9 g- i& B% M" C. b
- }+ Q! k% \) r# [; t2 p$ W N
$ j v! C7 h2 o% }7 m/ ]- // 运行所有的worker
" B$ X7 t# d$ @* v0 u! S - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');5 {9 r d8 \3 c8 @
- ws.onopen = function(){% Y2 }' f7 |+ q9 v( b
- var uid = 'uid1';
1 F' T2 f& V+ T! O7 e8 [9 { - ws.send(uid);& S, C/ y `- S3 p; A/ K# Z. i
- };
& \; _* \- b7 j. _# m# i - ws.onmessage = function(e){$ v# o' R# i7 G0 l9 A9 I3 C
- alert(e.data);$ W6 T& ^4 [0 p2 D
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
# {: u1 x/ n8 f- L& e# Z - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);3 ?6 @5 K& r1 a# W0 N
- // 推送的数据,包含uid字段,表示是给这个uid推送
; o% h9 j2 ] ^* v1 K - $data = array('uid'=>'uid1', 'percent'=>'88%');
4 o) g. d6 p+ d2 b& m' } - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符$ J0 D/ ~* j( v8 ~& T8 ^
- fwrite($client, json_encode($data)."\n"); _0 i/ L- K, I) q
- // 读取推送结果; s& I* H9 \$ y4 l
- echo fread($client, 8192);
复制代码 , i) U% j4 _1 E0 f% {
& Z* X, `, u3 v1 [- m |