- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;
6 a1 v( r* @. r0 }1 u% v - require_once __DIR__ . '/Workerman/Autoloader.php';9 G0 d$ B% ?1 d- ]5 Z/ C' H- c& d
, ^, K0 a, g' X8 J- $worker = new Worker();1 U3 }$ f& L8 ^: g! q
- // 4个进程& t& |) Z& I. {2 M2 H! h/ d6 p
- $worker->count = 4;" ~# m- x E: Q2 a
- // 每个进程启动后在当前进程新增一个Worker监听: I M9 B3 m; M( {# e# E. f
- $worker->onWorkerStart = function($worker), [4 K1 ]. }4 ]7 s3 J
- {, N0 K: A# J3 u& Q4 c, ^/ q d
- /**
3 C/ o& _6 U8 x6 U. ^ - * 4个进程启动的时候都创建2016端口的Worker
R1 u& f, ^" S3 w9 j+ } - * 当执行到worker->listen()时会报Address already in use错误1 E, A3 {( ^$ _2 X5 m! D1 l
- * 如果worker->count=1则不会报错0 Q0 j5 z Z4 F! d3 B
- */" s& ?) v% L& P8 Y
- $inner_worker = new Worker('http://0.0.0.0:2016');
7 [; }) i7 q2 P( x( x - $inner_worker->onMessage = 'on_message';- N" `: A" ^, ?0 M. L
- // 执行监听。这里会报Address already in use错误* q. O$ G# O' ?
- $inner_worker->listen();
2 V: L9 S2 E! u+ o - };
! d& z ]4 j: A5 a# F- S5 l
, r% ]0 T; E# D7 V- $worker->onMessage = 'on_message';8 U0 U- l. W1 d; |( I' C Z, q
- ) X! U4 M* Y5 y3 P: m
- function on_message($connection, $data). H! }5 s( g/ ~2 R( H [' Y
- {' }$ g/ _) X' B5 B
- $connection->send("hello\n");. N/ A9 _9 v" }3 K% Y% h* O
- }2 t+ A, @% I: x; R$ i, ~
- . d) w% _. A1 X+ O4 {
- // 运行worker6 f) W" G2 G: L- a% h. A$ B2 _
- Worker::runAll();- v/ S9 m% w& G) E1 u+ Y2 h9 o
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
* J# |+ Q" k( F9 W' E
" E) h$ I% r$ e/ e- use Workerman\Worker;
" m. t, H+ c1 C0 `: _# D/ w! e% I - require_once './Workerman/Autoloader.php';
3 S2 D5 o% w$ I
5 @4 I Q$ e0 C# B- $worker = new Worker('text://0.0.0.0:2015');* a' M) ]% M9 k& r1 t% M
- // 4个进程
; b3 L3 y0 `! \: ~4 Y - $worker->count = 4;
$ o4 K. p; e' s+ u: x: } - // 每个进程启动后在当前进程新增一个Worker监听
' n4 {2 i, N1 o" R# U# x3 G; K o7 D - $worker->onWorkerStart = function($worker)
1 {5 M4 D( V& l4 q - {) L; c, N9 M9 ` v
- $inner_worker = new Worker('http://0.0.0.0:2016');# w5 K' O9 ]# Y% q) g
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
* f5 E* R8 }' q* }8 r - $inner_worker->reusePort = true;! q! n3 Q6 h& q4 r, Z
- $inner_worker->onMessage = 'on_message';9 `+ m& L! s! V+ {& g8 q1 \
- // 执行监听。正常监听不会报错/ _ [$ h- x- Y
- $inner_worker->listen();+ }( l4 I) n% x; i, B+ ~
- };9 G4 \9 {# R/ f! L
( T. |4 m, x, W' f U9 g* j- q) k, e- $worker->onMessage = 'on_message';! \# o; v3 v; v: Y1 `" h0 F: e) ?
$ Z0 s! K5 s5 D2 M- function on_message($connection, $data)) n# l/ y; _+ e' J6 s
- {
- d5 g% R; d9 W. ^" v; ~, O n; }& Q - $connection->send("hello\n");2 m8 r( T3 t7 H, C
- }: L5 N+ f1 Q/ ]. f8 c: i8 ]% y% v
- 4 _- o( T# v1 ?
- // 运行worker# x) s3 ?" l- |) p2 N! y2 k9 A
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php5 B3 S; L/ N. Y7 U
- use Workerman\Worker;! ~$ Y! B$ h- W/ V3 X
- require_once './Workerman/Autoloader.php';
% V* t1 _( b; z, }6 Y8 A - // 初始化一个worker容器,监听1234端口0 @4 [5 [; U$ c9 P% f
- $worker = new Worker('websocket://0.0.0.0:1234');
! ~3 T3 u( A8 ]$ w3 ~) N - + ?" W F2 {8 t7 f- F
- /*! R/ O. O9 n- m) A" ~& t6 `
- * 注意这里进程数必须设置为1,否则会报端口占用错误
( p& _+ |. i/ H# X j - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
3 c7 P- B' n0 J6 | - */) R9 H0 Z' m* ~' q
- $worker->count = 1;
' s3 a# j0 F. ]! R7 x5 Z0 h - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口
- G% R7 _, C+ p& j/ e% B - $worker->onWorkerStart = function($worker)& c7 ?. ]) W% d4 F* v
- {# S$ K& I2 B5 f0 g4 S! Y! y
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符6 F# ?* y9 j! n
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
. o! K" ~9 y0 k4 [ - $inner_text_worker->onMessage = function($connection, $buffer)4 J( l* B: b' O( v
- {! X2 H$ r4 p( O3 M/ J" r
- // $data数组格式,里面有uid,表示向那个uid的页面推送数据2 N& u9 q& W9 ?" _8 K- Z
- $data = json_decode($buffer, true);9 |. y0 ?* D% R) m) O' A) k! O5 l
- $uid = $data['uid'];
5 [6 t9 m# f1 P3 L7 S - // 通过workerman,向uid的页面推送数据: a# }0 {& l* y2 B2 Z- w5 i
- $ret = sendMessageByUid($uid, $buffer);
( S. p' P+ E2 R! E) Q/ @+ g$ O - // 返回推送结果
( C! z6 B- H- Q1 ]4 D8 _ - $connection->send($ret ? 'ok' : 'fail');
3 ^5 z; l( ]2 O8 c2 W5 Y+ |0 k - };) l* ^+ o' m; m( D; @
- // ## 执行监听 ##
' ` c M" t$ S' O: W& F- w& R2 K - $inner_text_worker->listen();; o- I; G( ]8 c% s5 n
- };
7 R2 Y4 R2 Q2 p4 m& [( j8 N. N8 ]+ Q - // 新增加一个属性,用来保存uid到connection的映射# w; Z( ]" B. N s! d2 h
- $worker->uidConnections = array();* ^9 ~/ A! _4 H, ]) K* W8 K# \
- // 当有客户端发来消息时执行的回调函数; k- S" F: O5 J3 M- T
- $worker->onMessage = function($connection, $data)
* a' ]" Y) Q6 a& F( s- Z - {
3 ]0 W* Q5 ?1 `! Y7 F9 a - global $worker;
! |6 N. `) c! v% K1 p+ O* p$ i% |' ~9 Q - // 判断当前客户端是否已经验证,既是否设置了uid
& ~6 H( u% ~- s4 ~5 W i- ~3 i - if(!isset($connection->uid))! ]+ ~5 j+ P9 |- C" F3 [
- {
7 ^2 Y: p4 i8 L+ K - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)* c' k3 ?8 y% [! m# z Q4 D! J
- $connection->uid = $data;
2 U( @$ o, d8 }+ Y. } - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection," h$ [/ e% {/ K% v5 `
- * 实现针对特定uid推送数据
6 [% f: O$ s" O% e - */7 f+ \5 b- n, I5 i- y6 W. D: d5 L
- $worker->uidConnections[$connection->uid] = $connection;
/ l5 y7 D2 ^9 f' Z f8 \7 U - return;
7 e/ W, M! v; |/ c/ j: _: ^% O - }5 |0 }2 J5 N g$ k4 z5 t# d: Y* |
- };1 y3 H; K3 m' |$ n8 e
' e( q D# N3 @" P& a4 l$ h/ }) h- // 当有客户端连接断开时
/ h5 g$ W) F. A - $worker->onClose = function($connection)
3 ?. i& o4 _9 ` - {
$ j' g/ E$ a5 T2 e# }9 J - global $worker;
$ e9 i" K* l; O) \& { C - if(isset($connection->uid))
# P9 q. _8 p0 t - {
6 Q5 J/ ~5 k V2 G - // 连接断开时删除映射
. [$ d, x ?9 V, F - unset($worker->uidConnections[$connection->uid]);+ T3 P5 W$ z) z [4 ^9 R# m7 X
- }
$ f5 s: s! U& M2 E - };
2 G. o2 P- ^2 _1 Y X
. `* `9 I# }, p2 c- // 向所有验证的用户推送数据2 c1 Q- |& t# t) s
- function broadcast($message)
0 T. ?- P# f( m7 ]3 G! i - {! ]3 [; n5 m3 o+ Q- H
- global $worker;! a" m$ R: u+ b4 X
- foreach($worker->uidConnections as $connection)
. Q/ b% ?2 V/ n* o; P0 J, r - {$ l4 Z) h1 B0 J! w/ F
- $connection->send($message);; F0 I* M. a& C8 n
- }5 a/ _& ]0 M8 n
- }
, K8 ~! i& D% E6 l - 3 I9 ^7 X R$ F9 ]
- // 针对uid推送数据
o& v; X; }7 E7 k0 u - function sendMessageByUid($uid, $message)
; P0 Q1 q2 g; l' m& s - {3 }2 O( ^ b$ Z5 g1 A: |* g( H, B q8 J
- global $worker;% @) n8 g# v" t
- if(isset($worker->uidConnections[$uid]))
& T( P' w) A( N - {
: n ^* {. I+ Q8 K+ B p& N: C - $connection = $worker->uidConnections[$uid];
! S4 H3 v- m/ j5 D! Z6 H - $connection->send($message);/ m: H2 J5 ]) F( s2 n) H* \6 U
- return true;
; o8 P5 |8 T. p! Q7 z2 S9 n - }
+ H4 d# E2 H% ]8 r0 f9 l F - return false;9 y3 F" S0 |1 j6 E
- } z6 |! C/ }/ Y# W" e8 @
- 8 ?" T' t7 _3 y% ^0 Q& |$ ^
- // 运行所有的worker
, B2 t7 _5 ]- d! W8 R2 l2 F - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');
' B! A* J' x) M - ws.onopen = function(){
& ~$ ]7 C. {! V - var uid = 'uid1';
9 Y" M S& U* A& B6 i" g, @ - ws.send(uid);* y9 e7 A1 }. K% ~3 r/ f. ]
- };# Z# y3 K4 s' {
- ws.onmessage = function(e){. W3 ~- ?, W/ h( C- U
- alert(e.data);4 A! i6 T* d8 ]/ C, s
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口. y, g' t. U+ s2 m, |: B: e
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 M! _' b4 ?' T: \
- // 推送的数据,包含uid字段,表示是给这个uid推送. l3 z1 X6 c6 O1 H( P! _
- $data = array('uid'=>'uid1', 'percent'=>'88%');: Q; z3 K d9 D/ O+ V5 H h8 N! k! G
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符8 I1 z+ d2 ?: y/ O2 c
- fwrite($client, json_encode($data)."\n");
5 m+ ~6 S5 g; f- T - // 读取推送结果3 m& u3 ~2 X9 F2 R1 b' {; G
- echo fread($client, 8192);
复制代码 " V+ |, L/ w1 \( m; |
$ x8 g' G H& B$ Y
|