- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;; `2 e/ ]# N2 |0 `# g$ g
- require_once __DIR__ . '/Workerman/Autoloader.php';% O5 }2 P. ], l
. K4 k9 N6 c i, l) f- $worker = new Worker();2 S) {4 k3 s/ L* w% h- G! G2 ^' q
- // 4个进程
9 E& Z: ]- V) l& v- e) l( j - $worker->count = 4;2 [( E2 {+ \9 C; _
- // 每个进程启动后在当前进程新增一个Worker监听
' q. z$ q2 s- F: A - $worker->onWorkerStart = function($worker)0 J1 A L; n) d, E N
- {/ }0 r$ [0 `# ]9 v; w( {
- /**& O: k" H: a9 r2 }
- * 4个进程启动的时候都创建2016端口的Worker
7 v- _ N9 T. _' d/ E - * 当执行到worker->listen()时会报Address already in use错误" E: x$ f" ]6 V. T
- * 如果worker->count=1则不会报错1 M- m. ]$ n2 A+ d& ~6 b/ [' m
- */% U( R) b' z$ A. X8 [9 @
- $inner_worker = new Worker('http://0.0.0.0:2016');9 Y6 ~& ~& O4 \& t0 K$ B( e1 g9 b
- $inner_worker->onMessage = 'on_message';
, N: {/ `3 |- P) d - // 执行监听。这里会报Address already in use错误
. J# m4 u8 I/ D R - $inner_worker->listen();; [8 L! i' j; {4 W
- };
; B) I; T- ?7 Z" y1 d - $ [; Q1 a1 B) @* l! h% \
- $worker->onMessage = 'on_message';* o) ^, [& u$ ?, @# `
) j* F* ], x6 f7 z. e# ~! F- function on_message($connection, $data)
B9 c- Q; k0 n8 _ }! Z5 v - {6 p/ \, M6 c# _
- $connection->send("hello\n");8 h5 \) u& U4 y: o# d6 Q. k
- }* ]( `5 C; z- b6 k' X
1 e6 u9 e9 y9 S; e' o W2 c- F- // 运行worker
6 M8 Z+ B2 S0 O F( G* y6 S) y - Worker::runAll();
% u( ~2 z0 P3 f& B - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:+ _$ W! d$ Y3 R! s" \7 ~5 v/ ?& l
9 q/ l9 B3 X5 W) y; U- use Workerman\Worker;
& i$ d F u$ V+ ]% r) S - require_once './Workerman/Autoloader.php';
. J! Q7 Y7 l6 a) i9 Z5 k
; p& ? ]. l5 t- $worker = new Worker('text://0.0.0.0:2015');$ f/ |' q$ H5 M8 ^5 [
- // 4个进程9 F# V3 }" Z4 Z( E1 L% F8 C( Q& `
- $worker->count = 4;1 n/ g4 S2 L% w! t3 Q" i
- // 每个进程启动后在当前进程新增一个Worker监听
& |/ Z0 w0 i# T6 N" u2 ^2 e - $worker->onWorkerStart = function($worker)8 o5 W4 I i! q' B" S5 q
- {
1 `) G# p! n ` - $inner_worker = new Worker('http://0.0.0.0:2016');4 e" r4 Z2 k' d1 Y% Z
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
' _5 L/ b' D7 {5 i! M - $inner_worker->reusePort = true;& c% B T+ R' }2 p' q1 ?& c
- $inner_worker->onMessage = 'on_message';/ c- Y _: U7 J- F' l7 O/ z
- // 执行监听。正常监听不会报错
0 [ ~2 P2 P1 K7 A - $inner_worker->listen();. H- f0 W$ q% ?7 h ?5 N' Y
- };
& o: |" T: u$ W5 r* G8 A! a( r: r
) ~1 _* L0 H9 s, C3 P- $worker->onMessage = 'on_message';
/ T8 p, _+ K; z - * n- M! G5 F% a8 Y9 `
- function on_message($connection, $data)+ j2 f* F1 r1 Q" }/ G- r& r
- {: ]$ v1 `6 k2 j; |2 n
- $connection->send("hello\n");
9 [8 d7 {. S% I8 `7 Y1 y7 b - }
+ c9 s' b+ P/ p7 U# [5 Z! q
; Z8 r3 ^) E! @! \1 Q- // 运行worker5 A0 c E4 B F8 N
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php5 o R0 n8 O0 P' x/ H! ]2 W
- use Workerman\Worker;5 x3 V, Z& A/ [: Q% z
- require_once './Workerman/Autoloader.php';
, Y, }) ?2 U2 J" u" F6 N6 } - // 初始化一个worker容器,监听1234端口! v. z3 e% ^3 A7 t
- $worker = new Worker('websocket://0.0.0.0:1234');1 |# C. F) w2 J( I5 u6 }
C: C, Q7 E5 l$ ~+ Q! k/ ?- /*8 M5 d' Z% x" x S% A' O
- * 注意这里进程数必须设置为1,否则会报端口占用错误
( n+ T. W4 b9 E4 y& c5 T* v | - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
7 [) f5 y8 C- s! C1 Z - */
8 F; z; u3 S2 n* h+ ~( U- s4 M0 Q - $worker->count = 1;
1 `- H, A0 Y# f3 Z - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口2 ^( u; [8 J8 @1 a
- $worker->onWorkerStart = function($worker)" ]# S k9 \ w9 I
- {
- g" b$ v3 l' B: b- Z3 G - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符0 ?" f4 p- V9 t+ g
- $inner_text_worker = new Worker('text://0.0.0.0:5678');4 G. P- P: [5 } C; j: N
- $inner_text_worker->onMessage = function($connection, $buffer)
& `* e8 ]- K; }* L0 e D: d) }) S' O* c - {
* y- a) d" h' ?: W+ s. J" E - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
. y+ S& R4 \( M - $data = json_decode($buffer, true);
- L/ X' h3 Q5 P - $uid = $data['uid'];
4 e: [- `) q# J2 t. G& J - // 通过workerman,向uid的页面推送数据
4 k& U- i, v$ v2 m8 k - $ret = sendMessageByUid($uid, $buffer);- Z. ?! d* Y0 S+ w0 t+ B
- // 返回推送结果
; ?5 k: ^+ `# l. z7 e0 S4 { - $connection->send($ret ? 'ok' : 'fail');9 a: |" H! W- R
- };
5 l6 w9 l2 R7 J( m9 O - // ## 执行监听 ##
' l9 s" }* | g8 f7 Y# t& t% I - $inner_text_worker->listen(); U% m4 }, o6 c$ p6 v- g
- };
, {) N5 S( ?/ C0 ?2 L$ l) ~9 l* ` - // 新增加一个属性,用来保存uid到connection的映射
. i7 `" T" t7 E# ]; |, z& Y - $worker->uidConnections = array();6 F( d" e0 E4 l, ?
- // 当有客户端发来消息时执行的回调函数
* V8 h* j+ |4 T& U/ W - $worker->onMessage = function($connection, $data)9 V" Q' G4 P( X8 S Q
- { ~$ q4 r/ W) [% b( Z
- global $worker;
! {- r6 I% H: u" j# W) j - // 判断当前客户端是否已经验证,既是否设置了uid
* t3 f& n0 f/ q- l& d# u, } - if(!isset($connection->uid))0 Y9 m3 |: Y$ Q7 \3 L& p$ X
- {
5 V0 a# B. Q5 b - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)) |5 z9 k9 Q4 z9 _0 C/ Q. E
- $connection->uid = $data;2 u6 w3 G3 a( Y
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
4 P* g; v" u2 X1 J! i) B - * 实现针对特定uid推送数据8 F. U1 B4 ~5 e
- */5 j% `4 {9 |" D3 u7 L# N. \% J
- $worker->uidConnections[$connection->uid] = $connection;6 t0 H! @- o8 v, J1 X; A6 O) q
- return;
0 W* Q" s1 m2 h- L* e& r - }6 w" l: l Z# |+ S L
- };2 A. l1 | I# D- g, y6 T8 j
- ( E' f2 Z) r6 [' z, m
- // 当有客户端连接断开时 j& {6 t5 B9 X; s1 \2 x
- $worker->onClose = function($connection)$ K" Y4 r+ s: g* ~4 O; x0 a
- {
- L, B4 v9 n8 ?, ?3 F - global $worker;2 G# _+ O7 y7 k* h7 v3 M* {) m" l
- if(isset($connection->uid))0 b5 Z3 J. p/ {- v/ q1 ]
- {
! w' }/ S4 E9 ?4 K0 | - // 连接断开时删除映射
4 C% w( H0 W# ~ - unset($worker->uidConnections[$connection->uid]);
6 I& {% {* P- }; h9 l( d - }
9 x+ ]; O) I' n6 D - };) @( X- l# i% I+ N$ J+ a1 I0 t8 P b
( p$ e' s! \/ r3 y- // 向所有验证的用户推送数据
5 [6 ?: h5 t' H8 A" f/ `* [$ x+ v. D - function broadcast($message)
3 M& u% ?; V. p( w1 l! r0 l+ `. T' Y - {% q* ]0 h( I) A% a! h4 `% x
- global $worker;' i3 b1 ?) L1 Y, V8 b
- foreach($worker->uidConnections as $connection)
% R. `) a! {, g& D5 Y) Y - {3 o) L7 o" M6 [' s u% q* A
- $connection->send($message);; N% U. K/ ~; L( @5 u& ?7 j" u& N
- }
8 E _. Q; i/ ]* V - }
$ P( i" H/ h3 c7 R
4 U2 c0 u* S2 k- // 针对uid推送数据
8 Y. J4 z1 u% v4 m8 O% W - function sendMessageByUid($uid, $message)& T- L# `. C/ F8 [7 D" x6 j5 o5 y/ f ^
- {
5 L: R4 r. @, h! o$ Z8 e0 z( V - global $worker;
`% f; O0 P; u - if(isset($worker->uidConnections[$uid]))7 R) H, e4 c" m0 f: Q
- {' C. o( s0 [4 D4 u3 e" f+ P3 ]9 N
- $connection = $worker->uidConnections[$uid];
7 k) I( G, I/ Q9 y4 |8 ? - $connection->send($message);
8 t( G4 a6 Z# i& P - return true;# | U! ~9 c1 N+ z
- }
8 B2 q* I2 W. K8 Y0 T - return false;
9 f4 c% g6 U$ l - }9 M( P* ~$ A8 n% y
- 2 x4 K5 T" B4 `3 x. N
- // 运行所有的worker
u% H# L+ x4 z6 P - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');3 P" J1 u9 k: l
- ws.onopen = function(){2 c- s5 e7 ]6 l& w
- var uid = 'uid1';* f- w5 o, t ?' X
- ws.send(uid);
+ C! L' w1 _ m - };; X9 F) `* W& |- y& u" w% `
- ws.onmessage = function(e){
% `9 ~ a1 f- S" _ - alert(e.data);2 }' o! Y0 w0 {% b' m5 @* K4 h, h
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
: U/ F5 H! f5 z3 t# I - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);& s4 b9 M! v( [5 ~3 k6 k! X% d
- // 推送的数据,包含uid字段,表示是给这个uid推送7 @/ V5 C% N, [* ]# O% e
- $data = array('uid'=>'uid1', 'percent'=>'88%');: [6 ^+ M- y7 k* f5 w5 M
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符* c: p8 y; |$ L+ h0 K! g
- fwrite($client, json_encode($data)."\n");
5 C6 U& ^: Q2 }+ ?& y - // 读取推送结果
4 z8 d6 p0 a0 _ s - echo fread($client, 8192);
复制代码
/ J$ N) Z8 Z$ C9 z: k; S Q/ H9 r& x8 D9 n
|