cncml手绘网
标题: 用于实例化Worker后执行监听 [打印本页]
作者: admin 时间: 2018-12-17 21:22
标题: 用于实例化Worker后执行监听
- void Worker::listen(void)
复制代码用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
- use Workerman\Worker;
2 v0 m8 I) g( E& h - require_once __DIR__ . '/Workerman/Autoloader.php';
1 }4 }, c* P4 s5 j
. p7 }7 I$ p* {5 `9 \6 |- $worker = new Worker();( }6 K S/ ~$ W9 U' y( J5 Y+ J0 _
- // 4个进程) k# s+ ~* f& ]+ |+ N- J# s
- $worker->count = 4;
8 ?* k' a, W8 e. S- ~- s( E - // 每个进程启动后在当前进程新增一个Worker监听0 g. C/ \. c* `6 o0 k6 p$ T' A- V v0 K
- $worker->onWorkerStart = function($worker)
6 F6 c* V5 p0 T. p0 O$ m5 A7 j - {
, j( E2 Y: Q. C1 C; d - /**
l0 f% k# P" J, J( {# L. ?1 r - * 4个进程启动的时候都创建2016端口的Worker
( B: ]; _9 ]) m; I - * 当执行到worker->listen()时会报Address already in use错误( L- f2 T1 o. ~& P3 M1 W0 O
- * 如果worker->count=1则不会报错
8 Z! j) Y) C; S3 L) p! I5 @8 q - */. [' l* u# u# U C A
- $inner_worker = new Worker('http://0.0.0.0:2016');. {& ~# F5 R& w! [5 E4 M7 [! ~- D. l
- $inner_worker->onMessage = 'on_message';( r( I$ w4 c& S& p* G7 e- k C+ q
- // 执行监听。这里会报Address already in use错误
L3 o: p( Q) d1 b- T2 x - $inner_worker->listen();: e% o. |5 F! ]2 a* U( {
- };( n! ^, T# @9 B' N+ U: \# I: O
- 8 a6 P! X# {1 F( f
- $worker->onMessage = 'on_message';! y6 V2 S' r& Y# [4 }* l/ c
- 8 Y7 R6 \, a/ `4 W x
- function on_message($connection, $data)
7 \% a( h/ ?3 j$ }- t# m/ @1 c - {, j8 L' Y, ?; J# G" ?: A2 O
- $connection->send("hello\n");( v% d- g, V s7 v2 _* E
- }* F, W2 N. Q" s% U6 W& @
: U! G6 p0 D1 c- // 运行worker
' U6 c3 V, c# |6 R - Worker::runAll();
1 @/ ?0 {( ?" n5 {" _ - 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
3 V* t2 B+ P2 i' B! M8 Q - / T+ [; ^3 w, [6 v) q" _# D( @1 }
- use Workerman\Worker;8 o0 D# Z: y _6 `) g6 I
- require_once './Workerman/Autoloader.php';
p6 O X i" W
% E ?* K: r+ J7 }% I& k( Q6 k6 F- $worker = new Worker('text://0.0.0.0:2015');% x0 r+ _# O' ~1 ?. B
- // 4个进程
; ^" O5 x8 X& Q% K - $worker->count = 4;1 {# ^! g' S" A6 \* X
- // 每个进程启动后在当前进程新增一个Worker监听# {$ v2 v0 _ r( Y$ B+ F
- $worker->onWorkerStart = function($worker)
' L' C% R2 N" Z' w8 V- _+ K9 r - {
. T) E) F( ?; | x2 a6 ?! F( } - $inner_worker = new Worker('http://0.0.0.0:2016');/ z1 y9 [( p9 u( L( \9 j
- // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)
; P5 d) [; b+ { - $inner_worker->reusePort = true;, I; x$ g) X# M% u5 V
- $inner_worker->onMessage = 'on_message';
/ M6 W; s& c$ H - // 执行监听。正常监听不会报错
7 i F; h# x' F5 ?, q `$ K - $inner_worker->listen();% G3 M! D$ |6 k
- };5 k7 x3 R& b# u& H
7 K. r. e/ I+ g$ v% h; V" O( Y- $worker->onMessage = 'on_message';; Y1 d# ^/ [' U' R. {- Y
- 5 Y1 X6 C) m* j
- function on_message($connection, $data)
) L" X9 @+ g0 ~5 N) M. k; b. @ - {, z: B' O) G9 J
- $connection->send("hello\n");* U/ o5 ` C8 V
- }5 O# W# H! H" U! A( \5 i
- $ | X: {6 G5 e; \+ k5 z
- // 运行worker. Z' C# p6 n# |0 l T5 P9 {
- Worker::runAll();
复制代码 示例 php后端及时推送消息给客户端原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
- <?php- {( p5 \, Q, X$ C5 s
- use Workerman\Worker;
0 k+ s/ w7 A L+ Z - require_once './Workerman/Autoloader.php';6 ~& h9 I: V1 I" E8 W
- // 初始化一个worker容器,监听1234端口
* O3 E9 s) I% B - $worker = new Worker('websocket://0.0.0.0:1234');
' ?0 p1 o# C) @8 |* I - 3 Q9 N1 w2 h: V9 J0 E& B
- /*
: M6 e$ k+ Z0 k' \ L% x4 h( [' m9 \ - * 注意这里进程数必须设置为1,否则会报端口占用错误
7 P5 K( a, [4 c2 l0 R - * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)3 ~5 u, [. n0 ]1 X
- */
( {1 g$ p |+ q, o - $worker->count = 1;. q+ y. r: y# {2 L$ W
- // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 ~5 X6 ]7 f& D
- $worker->onWorkerStart = function($worker)( q+ O0 E. }, j- u2 M. ^$ A9 ^
- {. K% G( D) Z' ?# |# S. k
- // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符7 W. u/ v8 ~& p7 S( W# t; a% H
- $inner_text_worker = new Worker('text://0.0.0.0:5678');
! P1 C8 V) L w' b2 \6 Y - $inner_text_worker->onMessage = function($connection, $buffer)7 C; V+ ?! i; J! Z, J% m: k
- {
n: h: o9 K( T6 m, p" v' e - // $data数组格式,里面有uid,表示向那个uid的页面推送数据: \. r5 o: ^# q+ Z0 n" l% t
- $data = json_decode($buffer, true);2 o }4 { U7 Z" p `
- $uid = $data['uid'];& I9 W: |& C1 u; w0 c" G4 R4 }+ y
- // 通过workerman,向uid的页面推送数据
) m0 P+ ~/ |9 `. m1 V) M0 U9 o, G2 M - $ret = sendMessageByUid($uid, $buffer);. k0 }1 {/ C% A8 u' R( D
- // 返回推送结果8 C5 V, d. n3 Y9 x, w
- $connection->send($ret ? 'ok' : 'fail');. I8 j3 H. ?! Q# s0 a! M% n( w
- };2 a0 q% u: E/ K# [& p
- // ## 执行监听 ##$ S: }3 J) `9 f% K' \
- $inner_text_worker->listen();2 S: C* C% k E$ W# r% d
- };
/ E) G* u7 o5 C& O( w A - // 新增加一个属性,用来保存uid到connection的映射
8 T# V# a3 z8 ]% N - $worker->uidConnections = array();8 E! T( x( g4 a" t9 I+ A" [
- // 当有客户端发来消息时执行的回调函数
2 ~4 H3 i, u3 [7 Z; d+ s' U - $worker->onMessage = function($connection, $data)
! t/ t, e' a8 Z+ _- s - {! f+ u. B3 ^' r
- global $worker;6 J8 m# x+ g1 d
- // 判断当前客户端是否已经验证,既是否设置了uid
; n- b0 i, c4 u' j1 ~. C# w) ?* @ - if(!isset($connection->uid))
1 I; G, A. X* | - {
l, x3 c+ i r" W4 }; u" o - // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)# _4 X& {, J7 M+ L+ q+ V
- $connection->uid = $data;2 `8 J7 d2 V( P8 O2 K7 i) c
- /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
/ t3 l" H- N! W - * 实现针对特定uid推送数据/ ~9 R* ~3 K6 I4 n
- */6 E! g/ G" ~ d0 p' j
- $worker->uidConnections[$connection->uid] = $connection;: z) O/ G. J% `+ M+ h: v) N
- return;# `4 k' s: O1 _
- }3 d) M8 A; V: k, T# P
- };- E) K- H& X% @: S: {
1 t8 b$ v) X7 k' o, s% U+ i7 R+ K- // 当有客户端连接断开时
4 t y9 [. l9 f( q/ \ - $worker->onClose = function($connection)0 I3 K5 ^+ }1 u
- {$ {2 d* z& m0 V: m+ }# ?- ~$ g. d
- global $worker;4 l2 y/ h+ B9 T* x
- if(isset($connection->uid))
. B- u0 o6 y2 R6 \ f - {4 |/ V9 G3 x2 C
- // 连接断开时删除映射3 d" e8 M* p3 O! \
- unset($worker->uidConnections[$connection->uid]);+ t- G; }) L8 f
- }
% [: |. {& J, } - };5 u, q) \- b$ D! H
7 ^( L3 D3 c8 u" i$ R- // 向所有验证的用户推送数据
' T5 }; X" j0 \3 `! a" n( t' N3 X - function broadcast($message)- i1 Y" k9 m. o' o$ ], ?8 j
- {
9 }( U; E) P5 Q5 K - global $worker;
8 u( C. G3 p# I( w - foreach($worker->uidConnections as $connection)
/ c' Z- w! |/ F4 u3 |8 W - {
) p [" A! D8 p9 S$ K - $connection->send($message);
& K. m' N: a6 a7 B - }* A; o- ^3 K$ U% V0 b
- }
/ Y( Y3 T$ N' a - & U* {# Y* C7 Y! E7 K3 c% ^: z; s
- // 针对uid推送数据
" g- S& h: O" Z- v - function sendMessageByUid($uid, $message)
& E4 i1 H9 X4 J0 E6 C- H6 x2 f - {
1 ^8 I9 X6 }$ {% e9 L4 x - global $worker;- W. e2 H; v2 ~" \6 _/ [- ]
- if(isset($worker->uidConnections[$uid]))2 S! N) I0 B5 d' j
- {6 i* @ r2 W4 P2 ~
- $connection = $worker->uidConnections[$uid];* A8 [4 W" f' A" E( V( k7 t9 X3 J
- $connection->send($message);
1 L0 a8 [- G' A' T - return true;
' d2 s+ r; }8 e3 S6 l+ U - }
& M" N/ b# n* C" @9 b! b, L5 M - return false;
+ k; w! U0 H- @: F0 T - }; s/ X7 Z8 N: @/ T; k
" d. B* j% g8 Z C& e- H- // 运行所有的worker
2 z! k, z0 _) O* L - Worker::runAll();
复制代码启动后端服务 php push.php start -d
前端接收推送的js代码
- var ws = new WebSocket('ws://127.0.0.1:1234');" s6 {) d* d" F1 I( ^5 P
- ws.onopen = function(){
8 [0 N5 R9 {5 A" U. z0 g - var uid = 'uid1';6 V" V$ x" s2 D. @
- ws.send(uid);: F; O* R. s) l
- };, ^) e4 |" k" Z B7 V
- ws.onmessage = function(e){
2 {7 m) W( A. R) K Z - alert(e.data);
0 v3 g. r, R" |% X0 N - };
复制代码后端推送消息的代码
- // 建立socket连接到内部推送端口0 {) G6 [* S0 I+ ^: s
- $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
- _' z9 s- q& h% k' |3 S - // 推送的数据,包含uid字段,表示是给这个uid推送
9 _; i W; t7 G2 S) `9 R6 z - $data = array('uid'=>'uid1', 'percent'=>'88%');
* J0 m& d9 F0 H - // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
3 N! O% z0 t# D4 Z) S& k+ } - fwrite($client, json_encode($data)."\n");
& @( ^* k2 _3 T3 V - // 读取推送结果
7 w% ]" L0 X- r* j; N3 M0 K - echo fread($client, 8192);
复制代码
* V3 c( [* q/ J1 f5 n
: J! n2 P; T' k$ z6 r" L5 A
欢迎光临 cncml手绘网 (http://bbs.cncml.com/) |
Powered by Discuz! X3.2 |