- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。 此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。 例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。 注意: 如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。 - use Workerman\Worker;3 m2 k j% V* E
- require_once __DIR__ . '/Workerman/Autoloader.php';) E3 c1 G3 A, H, s' p
- b5 B0 J7 C" ?, L1 ?- ]- $worker = new Worker();# r" H* s! a. P3 k2 t
- // 4个进程
- F! `0 `" d; K i& F - $worker->count = 4;( V6 S# q5 }: Y1 j# L
- // 每个进程启动后在当前进程新增一个Worker监听5 b& V' e$ D' _8 Q/ E, ^( K" c m
- $worker->onWorkerStart = function($worker)
5 z# o: `/ U2 J6 T5 x - {
0 F$ Z; I% W( @1 O - /**
: G3 n4 u7 j, i& F& \& T - * 4个进程启动的时候都创建2016端口的Worker5 J3 }+ {* _! S' z, S" k J9 H
- * 当执行到worker->listen()时会报Address already in use错误, t$ c9 y( c) p# E/ M
- * 如果worker->count=1则不会报错' ]" Y0 J2 U/ \
- */
' o) E( q* i$ C# O" j8 O# X4 P - $inner_worker = new Worker('http://0.0.0.0:2016');
! h5 t$ J1 c8 {, V. a - $inner_worker->onMessage = 'on_message';
4 V! Q" |3 B- x3 I5 J6 k - // 执行监听。这里会报Address already in use错误
- y9 n# [' i, O$ ] - $inner_worker->listen();4 S6 |3 f& i$ O9 r: {
- };) j( V3 Z, f7 T+ z% ~8 d" a
k; U. `$ e- h4 b# ~% _- $worker->onMessage = 'on_message';
+ \; ]+ w; J2 c ~ H/ v/ X
, i9 q& _5 o/ K% |- function on_message($connection, $data)
z/ t1 ]7 l/ \0 X b3 K: ~0 Y - {. q; ?: d, m/ C4 m! G( R
- $connection->send("hello\n");
. ~ _& i2 m0 M/ V' z - }* l# K1 e( U# N2 q& q6 C
) v9 M8 S3 \+ o- // 运行worker
- V/ W3 C8 _, q: s- q$ Z4 I - Worker::runAll();
. w2 a5 w2 E6 E: k' c - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( }( {0 q. L/ b+ p% f
- : p0 |/ \$ g; E7 [
- use Workerman\Worker;) _4 {1 G% p, V# Y9 F. C
- require_once './Workerman/Autoloader.php';
: p, p. J& ^0 Q( m% z7 t$ T5 D6 m! r
9 s" B& Z7 m8 O& P9 b- $worker = new Worker('text://0.0.0.0:2015');! _; u' w' ?& Y) z$ V( ?6 D
- // 4个进程
+ ?/ `* U) _ U C1 B" N2 p - $worker->count = 4;
2 W; A; D1 B7 C) a5 Q0 K - // 每个进程启动后在当前进程新增一个Worker监听% w: S$ f) e& i2 M! `
- $worker->onWorkerStart = function($worker)
R0 D( I. C7 g6 ~3 ^ A, | - {
, g- V4 R* }# R - $inner_worker = new Worker('http://0.0.0.0:2016');
, W/ Y% @& P% g6 d; j - // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)5 `! {0 Q4 Q) x* K! O Q; n3 j4 a
- $inner_worker->reusePort = true;8 l; I5 d: J0 W3 E1 j
- $inner_worker->onMessage = 'on_message';5 _: c, B- h2 s
- // 执行监听。正常监听不会报错
# R; i" s# K, V4 Z9 J& k; W - $inner_worker->listen();
6 A: j! [6 o9 z; _+ s% k - };) i: Q4 Z1 G O1 b# F
- - Z9 y% b9 j% I* s3 {# R6 A+ I
- $worker->onMessage = 'on_message';
* G9 ]% O. M8 D* V1 C8 Q
: L: `! X# D' J1 c+ z4 m4 k$ q- function on_message($connection, $data)" Z* E) L/ Q/ ^
- {
% ?( v9 u4 B/ \- q' d6 D Y - $connection->send("hello\n");; F. Z7 x: ]" R2 r- r/ k
- }- A5 L* w7 ~$ @( B. Q& o: ]
# n ^" I. `6 t. j- // 运行worker5 W6 H4 C% x: L
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理: 1、建立一个websocket Worker,用来维持客户端长连接 2、websocket Worker内部建立一个text Worker 3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接 4、某个独立的php后台系统通过text协议与text Worker通讯 5、text Worker操作websocket连接完成数据推送 代码及步骤 push.php - <?php
0 N/ H) k$ W% k; f: d" ~ e - use Workerman\Worker;3 {* H3 A0 R4 N9 Q q1 V" U
- require_once './Workerman/Autoloader.php';
! t# p. [9 _+ h( X - // 初始化一个worker容器,监听1234端口. a2 }- |% Y; _4 Q
- $worker = new Worker('websocket://0.0.0.0:1234'); _& t5 G! [4 t
6 K9 P4 v! {) N2 V' _% l0 p- /*
5 [# a4 W# Q: ?, E, l - * 注意这里进程数必须设置为1,否则会报端口占用错误8 m8 G$ d4 {- s: @$ M
- * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
6 n) M9 O; x! T; m1 O | - */
! H, E8 S" d: a! m* B) l3 q. Z8 g - $worker->count = 1;
9 p/ ^( C5 ?! a9 _. ~8 @ - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口- E& v" m0 A; ~: E
- $worker->onWorkerStart = function($worker)
+ x8 O ^2 y' t) c2 p8 r - {; K, v. ?& ]' A8 E: Q
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
3 K: I" d: ?% H - $inner_text_worker = new Worker('text://0.0.0.0:5678');
/ a0 D2 L; E. V - $inner_text_worker->onMessage = function($connection, $buffer)
% t7 Y( a* Y5 W, q* P$ n, p2 l3 w7 b& x - {
* L/ Q. _3 B6 i8 n6 o - // $data数组格式,里面有uid,表示向那个uid的页面推送数据& V7 m" N1 @$ z9 W) M4 l
- $data = json_decode($buffer, true);
' Q; Y' s- V9 a9 b. n - $uid = $data['uid'];/ P& w9 S, X7 t) ]2 H# f3 ^& j
- // 通过workerman,向uid的页面推送数据
9 a0 }! l8 E- c- ~/ I/ @/ Y9 C0 } - $ret = sendMessageByUid($uid, $buffer);+ ]) C9 _1 n6 p" F1 ?$ F1 b
- // 返回推送结果! k2 G2 {2 `8 s+ |* ]$ W O+ g
- $connection->send($ret ? 'ok' : 'fail');
9 A w4 ^' C# p% @9 n" Q% Z0 [1 K! J6 d - };
0 x7 L4 F7 y8 }7 B9 Z q - // ## 执行监听 ##
- F$ A, s' h1 o( G3 k7 o& d/ ~ - $inner_text_worker->listen();) u7 M2 B' P! p& g7 j0 S
- };9 Q9 N- o6 l9 g0 T
- // 新增加一个属性,用来保存uid到connection的映射- j% M/ O) n4 w, Y/ m7 p
- $worker->uidConnections = array();
! {: j- R L" F* v* V5 p - // 当有客户端发来消息时执行的回调函数& q$ ]% P: h7 |! X5 n
- $worker->onMessage = function($connection, $data)
; v! D+ S3 ^- L: U - {
" U& k5 F) p: u$ Y - global $worker;
6 ]0 s1 Q: i1 T- X - // 判断当前客户端是否已经验证,既是否设置了uid8 h, T3 U+ A( L& w; s9 r) _& P
- if(!isset($connection->uid))
1 u( U8 i4 d$ c6 e. ] - {8 }* E8 o9 ?: U5 O4 i* u
- // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
8 V) l9 ~% }1 n$ L: q& G& b - $connection->uid = $data;
: _6 f7 L; c% T8 X3 h+ O2 Q1 ^: l - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
/ l6 Y) c0 t/ c1 }9 l5 `8 l: a - * 实现针对特定uid推送数据
+ ]* w! L. @8 m* O$ E p - */9 \; I9 s# b6 v: [
- $worker->uidConnections[$connection->uid] = $connection;/ C4 H( p: ?5 p+ g l. K; F2 w
- return;& ]2 Q$ R6 ~+ k: p" h' h
- }
* M, U7 a1 R- J Q X+ j# q - };, n( j+ O! S# b# h2 m
- * L5 l- E% n4 s( [( p
- // 当有客户端连接断开时
# i- a4 t% g P; k( @ G3 F - $worker->onClose = function($connection)
& X) }2 V3 s) S - {) r O9 L: I) K- G1 j4 p1 n
- global $worker;6 y& v2 t+ u, L" |, n8 S
- if(isset($connection->uid))+ l5 i: o( {- q
- {
& `# C7 \" l& ~' Y - // 连接断开时删除映射
, _$ W4 S% i* W) y - unset($worker->uidConnections[$connection->uid]);, Z. b3 K1 G, w- l! c
- }# K9 R! ]5 W6 w, A: b# }! W% V
- };; ]; `5 v( _) r; n: W+ j
- & f( D6 r; ?' N2 k" a
- // 向所有验证的用户推送数据; |- h: U" ^+ m$ f
- function broadcast($message)
# k+ V4 U1 H D [2 N: L - {
P+ R9 q" ~0 O: k* ]4 D5 m - global $worker;
}4 g( y9 W& S0 k - foreach($worker->uidConnections as $connection)9 b. [ Y) Z& S8 P, N" ^
- {9 X) V4 ^1 Q+ q
- $connection->send($message);
4 \/ Z/ ^* {5 Y) K - }
/ X4 g& Q X' _- p+ ? - }
3 ~' J5 R$ c, D4 v6 v7 I, V
/ T& d* R; M1 @$ F8 P- // 针对uid推送数据3 w+ i# @6 K0 {- A# S: o) [
- function sendMessageByUid($uid, $message)
% x4 e7 g# K. r4 W+ p F& V8 @ - {/ \& {3 \7 ~% U2 B+ n
- global $worker;
% X9 w' F% k6 v8 W8 R - if(isset($worker->uidConnections[$uid]))- b# b' x& x/ s) _
- {% K! t$ Y( K) G1 z1 [
- $connection = $worker->uidConnections[$uid];
- z" P' a' |0 N/ _ - $connection->send($message);: k9 y% d$ j% r. _
- return true;
4 N! y: s% k& e$ a' x# s o - }
K+ }9 L( P3 V$ `5 T - return false;1 f' a- a* e' j8 {- ?
- }$ Y2 R9 g9 R$ L
- ( Z. E2 C. |) r' y
- // 运行所有的worker
. I+ c/ B9 _# u8 b. O9 D& s - Worker::runAll();
复制代码启动后端服务 php push.php start -d 前端接收推送的js代码 - var ws = new WebSocket('ws://127.0.0.1:1234');! q- m6 _7 L# V4 Z8 x; B
- ws.onopen = function(){
- {8 I g. u {4 v! r0 M( R - var uid = 'uid1';
# m w* J5 T6 _/ ]; O0 A3 n - ws.send(uid);! x G$ p2 _9 [3 i7 R; y
- };
& Q- {3 ` q: C5 P7 w) N, N - ws.onmessage = function(e){
], A4 `; P& }+ ^3 j! g - alert(e.data);9 H J( r& U6 e# `) r
- };
复制代码后端推送消息的代码 - // 建立socket连接到内部推送端口
- ~ X$ L [7 o% i5 L - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
, D$ W0 |7 a4 u$ q+ \, ~! ]& r% ~$ B - // 推送的数据,包含uid字段,表示是给这个uid推送% a4 {, F0 J+ L' l+ _
- $data = array('uid'=>'uid1', 'percent'=>'88%');
# u, C9 b% Z5 U - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
+ G9 x. _# h. b& I9 b - fwrite($client, json_encode($data)."\n");! V* Y5 R+ Y) j, I6 M/ t, @
- // 读取推送结果5 j4 F! T7 X$ K" Q$ C* }+ {
- echo fread($client, 8192);
复制代码
* f/ q0 Z9 F- }6 ]3 T6 w) {7 E; t, I: g3 a
|