- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;8 I A! ?* r) W! l9 }
- require_once __DIR__ . '/Workerman/Autoloader.php';, G- k7 o0 G/ e7 p' f
9 j" V4 v& {3 v* l- I! q- $worker = new Worker();5 S+ X& f' \8 j7 q& p, ` t) H
- // 4个进程; H$ o" E, z+ {1 i" A0 R `
- $worker->count = 4;+ \2 W. T6 z @4 Q- V
- // 每个进程启动后在当前进程新增一个Worker监听
1 C7 X' D- K3 t3 ?7 v - $worker->onWorkerStart = function($worker), I( z* V4 w: O
- {/ s; b4 i1 w: @& N
- /**
2 B8 r- `3 {- l* z( y - * 4个进程启动的时候都创建2016端口的Worker
' v2 u2 Y9 S6 w3 }; b - * 当执行到worker->listen()时会报Address already in use错误2 V, J4 g: _! K+ K+ ~5 _
- * 如果worker->count=1则不会报错
* ^: k; S. R7 C% ~6 H( ] - */) i$ t* r4 A4 m! t$ Y
- $inner_worker = new Worker('http://0.0.0.0:2016');' S ?: P2 {1 u1 a: y( ]# V) z
- $inner_worker->onMessage = 'on_message';) h& q: {6 c% |* F
- // 执行监听。这里会报Address already in use错误% N9 v2 [ @- t" p! I: {# }, e
- $inner_worker->listen();- f' O+ ^3 p; Z" a" R" [
- };0 d+ E- M. V+ o* e
5 [$ r' p* w0 O4 w* f- $worker->onMessage = 'on_message';- R4 L* x0 ^' Q8 C: z/ Z
- & K7 c- x% ~" r; K, O) R
- function on_message($connection, $data)/ a; Y% _& K, Z, b+ T0 t) F$ I
- {/ y8 l1 h$ W! z4 U
- $connection->send("hello\n");
' ?5 ?" ]) I3 Z% ^& E1 r" K4 d - }7 A# x' C! [8 o, r; o' Y$ G& m
- 4 x0 [: B9 v/ w/ @' |; D
- // 运行worker1 L, @: \4 o+ j, L* D, A( S
- Worker::runAll();
9 |* _3 n) q0 E6 D) z - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:# }* t B$ A: a( |
, E. J8 {: E x& P- use Workerman\Worker;
1 N& ~8 u2 ^6 T* r$ _- W - require_once './Workerman/Autoloader.php';; A/ d2 u) E. q
- 8 s; p: z* I/ ]
- $worker = new Worker('text://0.0.0.0:2015');/ K' e! ]% C' f7 u* F! R3 D# d+ M
- // 4个进程$ s0 ~' U+ d' l! e4 t% B' q! G
- $worker->count = 4;
- C8 x' B& O' i4 [- p; F) T4 A - // 每个进程启动后在当前进程新增一个Worker监听2 R' r% }2 e+ }( m( m7 a! C
- $worker->onWorkerStart = function($worker): o5 D- j' z1 Z8 m+ i& H0 k: W# N
- {1 Y% H, u: a. p, \1 v1 t4 K* h- ^
- $inner_worker = new Worker('http://0.0.0.0:2016');
5 Z. Y5 q! U: j- n - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
* R( {0 n" ]+ I" b# R" H - $inner_worker->reusePort = true;
5 l8 P8 L5 O3 l. o5 Z - $inner_worker->onMessage = 'on_message';3 R4 \5 X* c+ T8 Z9 k
- // 执行监听。正常监听不会报错% X0 D6 ~( e- d
- $inner_worker->listen();, C$ n6 ]' q6 ~4 D$ A; n
- };( G; h2 F: C& e( e% g
- 2 W! ?2 o" o5 [5 O" K+ q4 S2 B$ m
- $worker->onMessage = 'on_message';
! M$ O8 c7 L1 \, @) K
" k$ N! v9 S4 S9 i% S4 X8 r) T- function on_message($connection, $data)
}" b& G6 x6 @ - {
$ C+ ?, r9 e2 f# A - $connection->send("hello\n");5 \' F; H6 I0 t5 ?* w" }
- }5 _( S/ S: [" ~! e9 s# l: g" G! [
w9 g0 R% f& o ?. X- // 运行worker
6 Y. t- S0 L' _' j# |. h5 z% S% m - Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
0 j% Y* x: I9 r6 m3 c1 R - use Workerman\Worker;: p9 w8 X& {# Z
- require_once './Workerman/Autoloader.php';
) f" u3 J) ~0 l1 s. w, h( G - // 初始化一个worker容器,监听1234端口
/ q- P/ c [. J( G2 i# c/ P* ^ - $worker = new Worker('websocket://0.0.0.0:1234');
) t3 `8 h0 }: d4 N& ^
0 k" s* z/ \3 Y& `$ x- /*
) e. e Y- f+ E+ X8 t - * 注意这里进程数必须设置为1,否则会报端口占用错误; K; t8 C. k& w1 z; j |7 l
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
B' a, w' h# T - */+ A! h1 V' {$ w$ r" \# R
- $worker->count = 1;5 `9 G2 Q9 I3 [8 i% a- G" U' n5 x* j! o
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口. U9 @6 w% q3 E# E
- $worker->onWorkerStart = function($worker)6 d, A; Z* W {) h
- {
) V1 w3 x, [5 E4 n. u. I - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
; K3 e- P; Z# |! ^ - $inner_text_worker = new Worker('text://0.0.0.0:5678');8 y" V' i( Z3 y5 o& ~* v
- $inner_text_worker->onMessage = function($connection, $buffer)
8 M) v( t" F. e$ _$ A6 l/ I. G - {
: d* O6 G2 e" H% n$ v+ B1 d - // $data数组格式,里面有uid,表示向那个uid的页面推送数据
: x5 H9 z' S& z, F% t - $data = json_decode($buffer, true);+ A4 w8 b$ e# ~+ k
- $uid = $data['uid'];* L6 v) q6 V( z
- // 通过workerman,向uid的页面推送数据4 [& `. w2 i; G' e2 B3 b
- $ret = sendMessageByUid($uid, $buffer);
& ~$ U) z; Q3 @ - // 返回推送结果4 T/ E( L4 J) y5 O
- $connection->send($ret ? 'ok' : 'fail');- K; B# z5 ^) i' ]+ \
- };
+ u! z0 f9 J! [% k$ K - // ## 执行监听 ##
/ i1 I R. D& R3 E, l1 W - $inner_text_worker->listen();
# r& g! V0 M; \1 o4 o - }; m9 y! f, v- j# M: }. H2 R& q1 `
- // 新增加一个属性,用来保存uid到connection的映射3 ?) o& Y( W* c! s
- $worker->uidConnections = array();
" b# n" ?) v1 K9 f7 |" P - // 当有客户端发来消息时执行的回调函数
5 G; d' [. G# G# J$ f" e - $worker->onMessage = function($connection, $data)
# o) ^3 a" y( o; U - {) {3 }/ q0 u2 {1 [4 _3 x+ d
- global $worker;
, L( y! @3 x* L9 ]: ` - // 判断当前客户端是否已经验证,既是否设置了uid* W( h8 V+ l, Y+ _. X
- if(!isset($connection->uid))% Q9 b2 ?% T6 U I Y
- {
$ b3 W9 }; Q4 q7 ~ - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)' i7 ]- V" j4 J- L
- $connection->uid = $data;( _0 C- p# x# @' D: S: E
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
( Z/ U0 w; u0 R" z7 z - * 实现针对特定uid推送数据/ s0 [0 J/ m$ Q ^/ D
- */) ^ v, \4 w( H! F
- $worker->uidConnections[$connection->uid] = $connection;
7 b! K" V! U5 j( W" F - return;! L8 d# s0 O% ~3 N
- }3 x, ]9 H4 W1 b8 l; F+ u
- };
! W- _/ ]+ O4 z8 d* C) i - ( r0 R1 W7 h8 G* F2 o, e, O8 Q
- // 当有客户端连接断开时6 u" N! @% r8 N; Y" N: b
- $worker->onClose = function($connection)/ H' J/ @) k7 J3 N5 d; ] A
- {
# R# f) \4 P( T, v4 e( r7 e7 v' Q - global $worker;
5 a: |" J2 J* D) T2 [ - if(isset($connection->uid))" w( f! y+ @5 \/ z2 E
- {: u: X- z1 I& n; g! i4 O0 J
- // 连接断开时删除映射
& A3 g; ~: j/ J% o6 g - unset($worker->uidConnections[$connection->uid]);9 ^5 a6 I, l9 k( p
- }$ C& I# W1 w. _0 [% L! J
- };) M' W1 {: g3 f ]5 a8 f
/ T/ H" T1 [& R; u& x s0 b' f- // 向所有验证的用户推送数据
d6 q G- f' t/ t! ^ - function broadcast($message)% o, f" k0 S, i0 z" X! q4 W
- {& j( s; h3 y9 W! o
- global $worker; R" g3 j" q+ ?# T g; e0 f- @
- foreach($worker->uidConnections as $connection)
" g- a! w4 p$ }6 {1 o - {# @2 X& ~& U2 M! N; N
- $connection->send($message);4 J5 W! O% ~# B# p @7 U
- }* {' }9 z1 s4 B' g
- }
3 f* u# T/ s* i) c2 n - 8 N! W) _! l' U! o( X$ D
- // 针对uid推送数据
7 ~3 l) m. N( [* v; j) \7 B+ V( q - function sendMessageByUid($uid, $message)
' N7 ~5 c: c; F/ L8 ] F) t9 {. z - {
' a7 K* Z8 T5 {' z0 o. c3 `- N - global $worker;
; W% K) p) l# L9 I - if(isset($worker->uidConnections[$uid]))
, M6 ^, t1 N/ L& q! a2 S - {. @" v$ X+ l/ f8 @3 e; B
- $connection = $worker->uidConnections[$uid];9 O- r8 Q: L9 }/ d8 Z8 d
- $connection->send($message);& M/ C1 h' @8 d" M4 w [/ W! y, g
- return true;
! m" r6 z( \7 K S1 c; j& \/ E8 A$ g - }6 y P2 {" |9 F F
- return false;
I) q) m9 ]: F. |$ c" A - }: _ @0 H+ D3 z. l3 j7 r3 S
* l8 y# K6 t$ m* T- // 运行所有的worker
/ P2 _% Y+ V+ T4 }1 ? - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');1 W' v9 i# l, u
- ws.onopen = function(){6 @* K7 W8 c# {
- var uid = 'uid1';
" L1 W$ L1 c1 |6 {) ? - ws.send(uid);" d! W1 F7 @$ n5 y ]) X
- };
9 x6 d. ~' s% O" p/ \3 q: g3 Z* p - ws.onmessage = function(e){* t( ]" J2 j# x
- alert(e.data);5 @* H+ @4 q% Q k8 m
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
' b( t- k8 P$ k- t( w7 } - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);6 ]+ F' m8 S# Q I' S# P
- // 推送的数据,包含uid字段,表示是给这个uid推送
! z6 H" B7 W; V4 f' o( L/ L6 H4 i - $data = array('uid'=>'uid1', 'percent'=>'88%');# w" X& r4 r- V4 Z
- // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符0 E0 a. T% J) {. J4 D
- fwrite($client, json_encode($data)."\n");
- s, u& K6 V3 {7 X% g - // 读取推送结果. y6 _, E7 a L% p2 [5 p+ K& y
- echo fread($client, 8192);
复制代码
; ]' |: d/ m. ?: H8 u; f3 o4 o: O$ [$ s
|