cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;
* J4 d! Q: u% n* m( C3 M - require_once __DIR__ . '/Workerman/Autoloader.php';
% d' b: p; b) {# I4 w P - 9 E8 v) A. \: c( A1 K: B1 q- S
- $worker = new Worker();/ S7 R6 H t7 `8 O' x+ p: C
- // 4个进程
: b" O6 h/ S$ O - $worker->count = 4;
4 H( i+ c+ t* a* G7 _( [ - // 每个进程启动后在当前进程新增一个Worker监听
" ?3 G/ q" W3 K2 S* v3 J - $worker->onWorkerStart = function($worker)% F, x/ w7 Q# f4 I0 m8 D+ H
- {
! p8 x( k$ a& X( c* Z* @ - /**
5 b# [- l/ p/ M3 { - * 4个进程启动的时候都创建2016端口的Worker
r+ o7 Q( ]. z& Q- W - * 当执行到worker->listen()时会报Address already in use错误. E7 @) E1 b( \6 e* i6 k+ d G
- * 如果worker->count=1则不会报错
6 i; Z! a# ? Q$ c( @" `7 N7 p - */
: X, i. G0 q! m - $inner_worker = new Worker('http://0.0.0.0:2016');% D6 m/ A7 n( ?
- $inner_worker->onMessage = 'on_message';/ z/ {7 |6 U- f: F3 W0 n. V1 h
- // 执行监听。这里会报Address already in use错误8 O$ Q6 O5 j5 x0 P% {
- $inner_worker->listen();% F, ]3 K; s* z
- };
/ e# D7 M, k2 ]& O - 3 E( x- ?1 p9 y- \# c; ~
- $worker->onMessage = 'on_message';
4 f4 n) x/ B8 E9 p! t - ) I; E$ F7 L# v% |
- function on_message($connection, $data) `6 z9 o! n+ _- V M, J u
- {
" D! T# I0 C( |; N; v9 ~ - $connection->send("hello\n");
$ b: J, s# ]1 T5 I8 Y - }
6 {0 f1 p) V) y+ ~- g3 [! N
- [2 W1 P+ Z; x. n- // 运行worker
5 h- D9 X. g" l2 Q - Worker::runAll();& z; _3 H C n
- 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
- y1 i0 J% I: \$ X l4 h: r
7 s7 ]. ]1 L. }" o) R+ @- use Workerman\Worker;. b. f% Q% Z; f& r% l F
- require_once './Workerman/Autoloader.php';
% n- K# s9 M, N' e5 R - % v! p! O. g1 m7 G
- $worker = new Worker('text://0.0.0.0:2015');
7 p& ]9 m! ~4 c* [+ m - // 4个进程
4 A' Q. a4 l0 p1 E" w - $worker->count = 4;
4 H' N4 s! J h. N6 Z1 b) L1 w - // 每个进程启动后在当前进程新增一个Worker监听9 p, g+ o) u8 R L+ w2 ~
- $worker->onWorkerStart = function($worker)
" R+ b- I; U0 a/ O* U2 d- N - {, H" t5 j6 J" v9 k* M1 h
- $inner_worker = new Worker('http://0.0.0.0:2016');) I W. `0 O; H% N
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0). K$ J% M$ U" @1 L* r2 Y
- $inner_worker->reusePort = true;
B( y9 y+ U8 g$ s. T j- x - $inner_worker->onMessage = 'on_message';
# q1 o/ b" T! ^- }* h$ T - // 执行监听。正常监听不会报错
% u# {; j* [8 P% l& ~- W8 W - $inner_worker->listen();
, O- I9 C/ u+ u6 b" O6 s) k - };8 l4 k1 n! X" f; g1 s7 M9 F2 i- Y
! Y2 q' {) l: g. ~7 Q) S9 G- $worker->onMessage = 'on_message';
; Q q u. ~( p u* r - 7 }- v8 q% p' i3 m# q* E# y% [
- function on_message($connection, $data)" b1 k' V! C# l4 |& d8 g# @+ s
- {
& m7 X$ \/ F# e1 h - $connection->send("hello\n");3 ?( i* p* I* ]% e7 H7 t2 `! E* ^
- }
* i# s9 H/ D+ P9 G - 8 y! c2 R7 ]% o& W
- // 运行worker4 }. }8 t/ R- @* w2 a
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php: R' r& t, Z' T7 G1 k
- use Workerman\Worker;8 C! {9 d; ]+ M, j# M- K
- require_once './Workerman/Autoloader.php';4 {5 B, t/ l( D$ R0 ~
- // 初始化一个worker容器,监听1234端口6 b1 l3 ^, [# q, F4 Q9 E$ u, |
- $worker = new Worker('websocket://0.0.0.0:1234');7 a. X4 N8 ?2 y& ^* }
- ! q# `5 m+ X; ~1 s9 d+ o
- /*. G+ w+ b. V/ ]5 d$ i
- * 注意这里进程数必须设置为1,否则会报端口占用错误
+ ]2 A: }, }4 y5 s- Z+ k - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true) s: u4 K1 ~/ ]4 d/ V$ D
- */
' \. m4 p% l5 P& Z* L - $worker->count = 1;
* C2 F& J( G! V0 S2 j+ ] - // worker进程启动后创建一个text Worker以便打开一个内部通讯端口, m; F; u) | V$ ?% H) w9 C4 H+ s
- $worker->onWorkerStart = function($worker)
. X! R- |$ S1 P - {
X3 ]4 {, A) ]$ w S' j1 e* b0 { - // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
7 f- x& p' _0 Z! b- O" E - $inner_text_worker = new Worker('text://0.0.0.0:5678');
6 K' C/ z/ l1 r2 w7 B - $inner_text_worker->onMessage = function($connection, $buffer)9 d {8 f f, H2 E, i! ?
- {
* E; E& I7 ~) _+ e - // $data数组格式,里面有uid,表示向那个uid的页面推送数据6 T0 ?8 v0 Y. x$ r: h3 U
- $data = json_decode($buffer, true);
( d% p4 t5 w4 _* l - $uid = $data['uid'];
+ q k% L9 e4 D; t5 ?% \$ o - // 通过workerman,向uid的页面推送数据( q: F: G# y) w8 h, R" I; Q* i
- $ret = sendMessageByUid($uid, $buffer);
3 g/ K' e7 n, ~1 b4 q: F. g - // 返回推送结果
) q) G5 D) E: w# D - $connection->send($ret ? 'ok' : 'fail');: o8 m* P7 k( E& O/ Q# J" ~! z
- };% f% f0 g9 L7 @
- // ## 执行监听 ##
+ E2 h8 n8 f. U' Q - $inner_text_worker->listen();
' h% z6 @% q- m* \ - };+ y# d6 P9 Y" @- x" W# R$ r+ g
- // 新增加一个属性,用来保存uid到connection的映射; u% u4 G: P( y8 ^9 G: m a: |
- $worker->uidConnections = array();
. G8 \$ o5 G/ d - // 当有客户端发来消息时执行的回调函数/ W$ ~+ @- Q& h3 J; b; k
- $worker->onMessage = function($connection, $data)! `; b1 h- L# B5 A8 y& u
- {& i( K5 u' t& U# m. w
- global $worker;% h' E3 I4 A2 J
- // 判断当前客户端是否已经验证,既是否设置了uid4 W! T# F; o" I, o) e( U; G
- if(!isset($connection->uid))8 B6 k, R0 i \) ]
- {
* ^+ t; S( C# k6 }. y; r) B, s3 t/ J - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证). D: n3 D, S1 T3 O. E- e8 |" E( W
- $connection->uid = $data;
4 z$ l! a& B; B9 `5 P - /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,; H/ p9 L* B; e0 ]! X
- * 实现针对特定uid推送数据
R5 ^$ ~. q" `/ J1 X b' z - */
& C: k+ { b3 @- a- h2 r% q - $worker->uidConnections[$connection->uid] = $connection;' ^( g5 w! D! Z3 {+ a
- return;
8 Y6 v: B3 n/ g) j/ j - }+ C3 j% q" x0 @+ L7 V; z* [1 @
- };
0 V: X! e7 o% V) {, } - * L% c6 [: d: k x: ~
- // 当有客户端连接断开时: {) E+ h7 ]7 H8 n! Y8 K
- $worker->onClose = function($connection)" ^) o, o8 g# w2 F! E0 i' G+ }
- {0 w0 x) Y& h0 M, ` b+ f' L
- global $worker;
7 |9 q, y- d6 w3 c4 b$ m - if(isset($connection->uid))4 ^* X% j# S V, U( \3 t
- {
& F9 J9 s! \ D1 B; b: D$ |' ~. J - // 连接断开时删除映射
5 _! c- \4 D0 X. P - unset($worker->uidConnections[$connection->uid]);. ]) P2 J; d- a B; d; I
- }
' u3 A4 ?( x, z: D7 E# l - };
. j i5 N' c4 D1 s: i2 v, G/ h
O& ~" F P; r4 p( P1 S9 G0 ?- // 向所有验证的用户推送数据/ d8 L' d) C8 i+ E% ^* F
- function broadcast($message)
$ C9 z) h3 Q( V/ n+ Y - {& z1 c5 G+ D6 z1 D9 G9 W5 U( T
- global $worker;
5 @0 ~# ]3 c4 y - foreach($worker->uidConnections as $connection), \- q% }0 _5 m/ g$ |" G
- {
+ Q" `5 a' W1 L4 _& S9 J3 U - $connection->send($message);& N. }6 l: l: u3 s& L5 h# r" c$ e
- }5 m4 [3 d3 {; O" c
- }3 T/ ? Y( T# N! x
8 p' Y5 X+ _* D" t" ^% A, ^, k- // 针对uid推送数据
9 i y3 l1 d: ?' K" ^ - function sendMessageByUid($uid, $message)% C8 W3 Z) U$ ~7 M( Y& O
- {
# V& W8 l, z+ @8 C V# \ - global $worker;
) M* @" C6 O8 o7 M - if(isset($worker->uidConnections[$uid]))6 @2 ]+ E4 B! ]. x4 s. N5 w
- {- s1 o1 u& _! M6 L5 {
- $connection = $worker->uidConnections[$uid];
1 l- e5 _; ~# t3 |( D& i# ~9 N - $connection->send($message);2 t$ g- h- p+ c+ ]$ s9 e
- return true;
% t; d* `5 @8 J+ p( C - }- J2 r& ]* D5 D7 w E& x( W7 R+ v
- return false;) h! e, c5 ~( T' H0 ^; x; a9 y
- }
2 F1 p. C7 z1 L' `6 b
& \6 Z! \; W& w1 F$ n7 z Q- // 运行所有的worker5 B; ^4 F7 m7 j
- Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');
; E+ g, g4 D' o! O4 \2 I8 o - ws.onopen = function(){ J ^& s7 ~0 V4 p4 E8 v/ Y
- var uid = 'uid1';
{/ ^/ q7 U. R6 W5 J1 R! h' M - ws.send(uid);
2 B( I$ v( ~9 A; }. F! p7 l% @ - };: e, S0 D1 d: k% M% n8 Z! Y
- ws.onmessage = function(e){
4 j! P0 d0 h5 _ - alert(e.data);
1 R: [2 E& w4 L - };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口
5 X" Y% ^6 ?, y% I8 Y - $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);8 O) `5 ?% C3 m: a( ~- W
- // 推送的数据,包含uid字段,表示是给这个uid推送# ?8 m i- a+ H2 ?' I$ D' i2 v
- $data = array('uid'=>'uid1', 'percent'=>'88%');
8 [2 X- a7 l( e3 d) [2 ^& h - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符4 i' o+ ~5 }- Z1 |
- fwrite($client, json_encode($data)."\n");
9 A8 P& [& I4 m: \' U6 \ - // 读取推送结果
$ Z2 |% ~. W% j+ G8 n3 W% e5 G - echo fread($client, 8192);
复制代码 6 ]/ x! ~' |9 {( W) y7 i2 W
0 l3 O" W0 R, {$ w
| 欢迎光临 cncml手绘网 (http://bbs.cncml.com/) |
Powered by Discuz! X3.2 |